Skip to content

Commit

Permalink
feat: Max concurrent backfill tasks (#168)
Browse files Browse the repository at this point in the history
* feat: Max concurrent backfill tasks

You can now set the number of maximum concurrent backfill tasks for a stream using the Datastream API.

PiperOrigin-RevId: 530067890

Source-Link: googleapis/googleapis@b2c290f

Source-Link: googleapis/googleapis-gen@83c5413
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiODNjNTQxM2U1MzVjYjZmYTcxMDU0MWRjNmUxNjlhOGE0NGI4YTY1ZCJ9

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
gcf-owl-bot[bot] and gcf-owl-bot[bot] authored May 8, 2023
1 parent 608f119 commit f3ac2d7
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,9 @@ class CreatePrivateConnectionRequest(proto.Message):
The request ID must be a valid UUID with the
exception that zero UUID is not supported
(00000000-0000-0000-0000-000000000000).
force (bool):
Optional. If set to true, will skip
validations.
"""

parent: str = proto.Field(
Expand All @@ -1075,6 +1078,10 @@ class CreatePrivateConnectionRequest(proto.Message):
proto.STRING,
number=4,
)
force: bool = proto.Field(
proto.BOOL,
number=6,
)


class ListPrivateConnectionsRequest(proto.Message):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,12 @@ class BigQueryProfile(proto.Message):


class StaticServiceIpConnectivity(proto.Message):
r"""Static IP address connectivity."""
r"""Static IP address connectivity. Used when the source database
is configured to allow incoming connections from the Datastream
public IP addresses for the region specified in the connection
profile.
"""


class ForwardSshTunnelConnectivity(proto.Message):
Expand Down Expand Up @@ -792,15 +797,20 @@ class OracleSourceConfig(proto.Message):
Oracle objects to exclude from the stream.
max_concurrent_cdc_tasks (int):
Maximum number of concurrent CDC tasks. The
number should be non negative. If not set (or
set to 0), the system's default value will be
number should be non-negative. If not set (or
set to 0), the system's default value is used.
max_concurrent_backfill_tasks (int):
Maximum number of concurrent backfill tasks.
The number should be non-negative. If not set
(or set to 0), the system's default value is
used.
drop_large_objects (google.cloud.datastream_v1.types.OracleSourceConfig.DropLargeObjects):
Drop large object values.
This field is a member of `oneof`_ ``large_objects_handling``.
stream_large_objects (google.cloud.datastream_v1.types.OracleSourceConfig.StreamLargeObjects):
Stream large object values.
Stream large object values. NOTE: This
feature is currently experimental.
This field is a member of `oneof`_ ``large_objects_handling``.
"""
Expand All @@ -825,6 +835,10 @@ class StreamLargeObjects(proto.Message):
proto.INT32,
number=3,
)
max_concurrent_backfill_tasks: int = proto.Field(
proto.INT32,
number=4,
)
drop_large_objects: DropLargeObjects = proto.Field(
proto.MESSAGE,
number=100,
Expand Down Expand Up @@ -967,12 +981,18 @@ class PostgresqlSourceConfig(proto.Message):
PostgreSQL objects to exclude from the
stream.
replication_slot (str):
Required. The name of the logical replication
slot that's configured with the pgoutput plugin.
Required. Immutable. The name of the logical
replication slot that's configured with the
pgoutput plugin.
publication (str):
Required. The name of the publication that includes the set
of all tables that are defined in the stream's
include_objects.
max_concurrent_backfill_tasks (int):
Maximum number of concurrent backfill tasks.
The number should be non negative. If not set
(or set to 0), the system's default value will
be used.
"""

include_objects: "PostgresqlRdbms" = proto.Field(
Expand All @@ -993,6 +1013,10 @@ class PostgresqlSourceConfig(proto.Message):
proto.STRING,
number=4,
)
max_concurrent_backfill_tasks: int = proto.Field(
proto.INT32,
number=5,
)


class MysqlColumn(proto.Message):
Expand Down Expand Up @@ -1122,6 +1146,11 @@ class MysqlSourceConfig(proto.Message):
number should be non negative. If not set (or
set to 0), the system's default value will be
used.
max_concurrent_backfill_tasks (int):
Maximum number of concurrent backfill tasks.
The number should be non negative. If not set
(or set to 0), the system's default value will
be used.
"""

include_objects: "MysqlRdbms" = proto.Field(
Expand All @@ -1138,6 +1167,10 @@ class MysqlSourceConfig(proto.Message):
proto.INT32,
number=3,
)
max_concurrent_backfill_tasks: int = proto.Field(
proto.INT32,
number=4,
)


class SourceConfig(proto.Message):
Expand Down Expand Up @@ -1268,7 +1301,8 @@ class GcsDestinationConfig(proto.Message):
file_rotation_interval (google.protobuf.duration_pb2.Duration):
The maximum duration for which new events are
added before a file is closed and a new file is
created.
created. Values within the range of 15-60
seconds are allowed.
avro_file_format (google.cloud.datastream_v1.types.AvroFileFormat):
AVRO file format configuration.
Expand Down Expand Up @@ -1307,7 +1341,7 @@ class GcsDestinationConfig(proto.Message):


class BigQueryDestinationConfig(proto.Message):
r"""
r"""BigQuery destination configuration
This message has `oneof`_ fields (mutually exclusive fields).
For each oneof, at most one member field can be set at the same time.
Expand Down Expand Up @@ -1340,7 +1374,7 @@ class SingleTargetDataset(proto.Message):
Attributes:
dataset_id (str):
The dataset ID of the target dataset.
"""

dataset_id: str = proto.Field(
Expand All @@ -1354,7 +1388,8 @@ class SourceHierarchyDatasets(proto.Message):
Attributes:
dataset_template (google.cloud.datastream_v1.types.BigQueryDestinationConfig.SourceHierarchyDatasets.DatasetTemplate):
The dataset template to use for dynamic
dataset creation.
"""

class DatasetTemplate(proto.Message):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
],
"language": "PYTHON",
"name": "google-cloud-datastream",
"version": "1.5.1"
"version": "0.1.0"
},
"snippets": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
],
"language": "PYTHON",
"name": "google-cloud-datastream",
"version": "1.5.1"
"version": "0.1.0"
},
"snippets": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class datastreamCallTransformer(cst.CSTTransformer):
CTRL_PARAMS: Tuple[str] = ('retry', 'timeout', 'metadata')
METHOD_TO_PARAMS: Dict[str, Tuple[str]] = {
'create_connection_profile': ('parent', 'connection_profile_id', 'connection_profile', 'request_id', 'validate_only', 'force', ),
'create_private_connection': ('parent', 'private_connection_id', 'private_connection', 'request_id', ),
'create_private_connection': ('parent', 'private_connection_id', 'private_connection', 'request_id', 'force', ),
'create_route': ('parent', 'route_id', 'route', 'request_id', ),
'create_stream': ('parent', 'stream_id', 'stream', 'request_id', 'validate_only', 'force', ),
'delete_connection_profile': ('name', 'request_id', ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10421,6 +10421,7 @@ def test_create_stream_rest(request_type):
},
"exclude_objects": {},
"max_concurrent_cdc_tasks": 2550,
"max_concurrent_backfill_tasks": 3076,
"drop_large_objects": {},
"stream_large_objects": {},
},
Expand Down Expand Up @@ -10450,6 +10451,7 @@ def test_create_stream_rest(request_type):
},
"exclude_objects": {},
"max_concurrent_cdc_tasks": 2550,
"max_concurrent_backfill_tasks": 3076,
},
"postgresql_source_config": {
"include_objects": {
Expand Down Expand Up @@ -10479,6 +10481,7 @@ def test_create_stream_rest(request_type):
"exclude_objects": {},
"replication_slot": "replication_slot_value",
"publication": "publication_value",
"max_concurrent_backfill_tasks": 3076,
},
},
"destination_config": {
Expand Down Expand Up @@ -10766,6 +10769,7 @@ def test_create_stream_rest_bad_request(
},
"exclude_objects": {},
"max_concurrent_cdc_tasks": 2550,
"max_concurrent_backfill_tasks": 3076,
"drop_large_objects": {},
"stream_large_objects": {},
},
Expand Down Expand Up @@ -10795,6 +10799,7 @@ def test_create_stream_rest_bad_request(
},
"exclude_objects": {},
"max_concurrent_cdc_tasks": 2550,
"max_concurrent_backfill_tasks": 3076,
},
"postgresql_source_config": {
"include_objects": {
Expand Down Expand Up @@ -10824,6 +10829,7 @@ def test_create_stream_rest_bad_request(
"exclude_objects": {},
"replication_slot": "replication_slot_value",
"publication": "publication_value",
"max_concurrent_backfill_tasks": 3076,
},
},
"destination_config": {
Expand Down Expand Up @@ -10996,6 +11002,7 @@ def test_update_stream_rest(request_type):
},
"exclude_objects": {},
"max_concurrent_cdc_tasks": 2550,
"max_concurrent_backfill_tasks": 3076,
"drop_large_objects": {},
"stream_large_objects": {},
},
Expand Down Expand Up @@ -11025,6 +11032,7 @@ def test_update_stream_rest(request_type):
},
"exclude_objects": {},
"max_concurrent_cdc_tasks": 2550,
"max_concurrent_backfill_tasks": 3076,
},
"postgresql_source_config": {
"include_objects": {
Expand Down Expand Up @@ -11054,6 +11062,7 @@ def test_update_stream_rest(request_type):
"exclude_objects": {},
"replication_slot": "replication_slot_value",
"publication": "publication_value",
"max_concurrent_backfill_tasks": 3076,
},
},
"destination_config": {
Expand Down Expand Up @@ -11319,6 +11328,7 @@ def test_update_stream_rest_bad_request(
},
"exclude_objects": {},
"max_concurrent_cdc_tasks": 2550,
"max_concurrent_backfill_tasks": 3076,
"drop_large_objects": {},
"stream_large_objects": {},
},
Expand Down Expand Up @@ -11348,6 +11358,7 @@ def test_update_stream_rest_bad_request(
},
"exclude_objects": {},
"max_concurrent_cdc_tasks": 2550,
"max_concurrent_backfill_tasks": 3076,
},
"postgresql_source_config": {
"include_objects": {
Expand Down Expand Up @@ -11377,6 +11388,7 @@ def test_update_stream_rest_bad_request(
"exclude_objects": {},
"replication_slot": "replication_slot_value",
"publication": "publication_value",
"max_concurrent_backfill_tasks": 3076,
},
},
"destination_config": {
Expand Down Expand Up @@ -13579,6 +13591,7 @@ def test_create_private_connection_rest_required_fields(
# Check that path parameters and body parameters are not mixing in.
assert not set(unset_fields) - set(
(
"force",
"private_connection_id",
"request_id",
)
Expand Down Expand Up @@ -13645,6 +13658,7 @@ def test_create_private_connection_rest_unset_required_fields():
assert set(unset_fields) == (
set(
(
"force",
"privateConnectionId",
"requestId",
)
Expand Down

0 comments on commit f3ac2d7

Please sign in to comment.