Skip to content

Commit

Permalink
Merge pull request #27 from superstreamlabs/2.4.0-beta
Browse files Browse the repository at this point in the history
release
  • Loading branch information
idanasulin2706 authored Sep 25, 2024
2 parents 4108317 + 142306a commit 93c6525
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pipeline {
}
withCredentials([string(credentialsId: 'gh_token', variable: 'GH_TOKEN')]) {
sh """
gh release create $versionTag /tmp/kafka-clients/kafka-client-${env.versionTag}.tar.gz --generate-notes
gh release create $versionTag dist/superstream_confluent_kafka-${env.versionTag}.tar.gz --generate-notes
"""
}
}
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# Superstream-Kafka

## Configuration for Superstream SDK

To leverage the full capabilities of the Superstream SDK, it is essential to set the environment variables provided in the table below before initializing the SDK. Without setting-up the environment variables, the SDK will function as a standard Kafka SDK.

| Environment Variable | Default | Required | Description |
|-------------------------------------|------------------|-----------|-------------------------------------------------------------------------------------------------------|
| `SUPERSTREAM_HOST` | - | Yes | Specify the host URL of the Superstream service to connect to the appropriate Superstream environment. |
| `SUPERSTREAM_TOKEN` | - | No | This authentication token is required when the engine is configured to work with local authentication, to securely access the Superstream services. |
| `SUPERSTREAM_TAGS` | Empty string | No | Set this variable to tag the client. This is a string - comma-separated list of tags. |
| `SUPERSTREAM_DEBUG` | False | No | Set this variable to true to enable Superstream logs. By default, there will not be any Superstream related logs. |
| `SUPERSTREAM_RESPONSE_TIMEOUT` | 3000 | No | Set this variable to specify the timeout in milliseconds for the Superstream service response. |

> [!IMPORTANT]
> __Ensure that these environment variables are properly configured in your system to fully utilize the enhanced features offered by Superstream SDK.__
Confluent's Python Client for Apache Kafka<sup>TM</sup>
=======================================================

Expand Down
4 changes: 2 additions & 2 deletions patch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ options:
The `src`, `prefix`, and `output` are required arguments. The `version` argument is optional. An example command is shown below:

```sh
python3 patch.py --src "/input/path/to/wheel/created/using/pdm" --output "/output/path/to/patched/pkgs" --prefix "superstream-confluent-kafka-beta-2.4.0"
python3 patch.py --src "/input/path/to/wheel/created/using/pdm" --output "/output/path/to/patched/pkgs" --prefix "superstream-confluent-kafka-beta-2.4.0.1"
```

**The value of `--prefix` should be the same as the name of the package that will be patched followed by the version number.** For example, if the package name is `superstream-confluent-kafka` and the version is `2.4.0`, the value of `--prefix` should be `superstream-confluent-kafka-beta-2.4.0`.
**The value of `--prefix` should be the same as the name of the package that will be patched followed by the version number.** For example, if the package name is `superstream-confluent-kafka` and the version is `2.4.0.1`, the value of `--prefix` should be `superstream-confluent-kafka-beta-2.4.0.1`.
46 changes: 35 additions & 11 deletions src/confluent_kafka/superstream/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


class SdkInfo:
VERSION = "2.4.0"
VERSION = "2.4.0.1"
LANGUAGE = "python"


Expand All @@ -22,8 +22,8 @@ class SuperstreamKeys:


class SuperstreamValues:
MAX_TIME_WAIT_CAN_START = 60 * 10 # in seconds
DEFAULT_SUPERSTREAM_TIMEOUT = 3000 # in milliseconds
MAX_TIME_WAIT_CAN_START = 60 * 10 # in seconds
DEFAULT_SUPERSTREAM_TIMEOUT = 3000 # in milliseconds
OPTIMIZED_CONFIGURATION_KEY = "optimized_configuration"
INTERNAL_USERNAME = "superstream_internal"

Expand All @@ -45,14 +45,38 @@ class SuperstreamSubjects:
START_CLIENT = "internal.startClient.%s"


class EnvVars:
SUPERSTREAM_HOST = os.getenv("SUPERSTREAM_HOST")
SUPERSTREAM_TOKEN = os.getenv("SUPERSTREAM_TOKEN", "no-auth")
SUPERSTREAM_LEARNING_FACTOR: int = int(os.getenv("SUPERSTREAM_LEARNING_FACTOR", 20))
SUPERSTREAM_TAGS: str = os.getenv("SUPERSTREAM_TAGS", "")
SUPERSTREAM_DEBUG: bool = os.getenv("SUPERSTREAM_DEBUG", "False").lower() in ("true")
SUPERSTREAM_RESPONSE_TIMEOUT: float = float(os.getenv("SUPERSTREAM_RESPONSE_TIMEOUT", 3))
SUPERSTREAM_REDUCTION_ENABLED: bool = os.getenv("SUPERSTREAM_REDUCTION_ENABLED", "") == "true"
class EnvVarsMeta(type):
@property
def SUPERSTREAM_HOST(cls) -> str:
return os.getenv("SUPERSTREAM_HOST")

@property
def SUPERSTREAM_TOKEN(cls) -> str:
return os.getenv("SUPERSTREAM_TOKEN", "no-auth")

@property
def SUPERSTREAM_LEARNING_FACTOR(cls) -> int:
return int(os.getenv("SUPERSTREAM_LEARNING_FACTOR", 20))

@property
def SUPERSTREAM_TAGS(cls) -> str:
return os.getenv("SUPERSTREAM_TAGS", "")

@property
def SUPERSTREAM_DEBUG(cls) -> bool:
return os.getenv("SUPERSTREAM_DEBUG", "False").lower() in ("true")

@property
def SUPERSTREAM_RESPONSE_TIMEOUT(cls) -> float:
return float(os.getenv("SUPERSTREAM_RESPONSE_TIMEOUT", SuperstreamValues.DEFAULT_SUPERSTREAM_TIMEOUT))

@property
def SUPERSTREAM_REDUCTION_ENABLED(cls) -> bool:
return os.getenv("SUPERSTREAM_REDUCTION_ENABLED", "") == "true"


class EnvVars(metaclass=EnvVarsMeta):
pass


class KafkaProducerConfigKeys:
Expand Down
28 changes: 16 additions & 12 deletions src/confluent_kafka/superstream/consumer_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@ def __init__(self, config: Dict):

def set_full_configuration(self, config: Dict[str, Any]):
full_config = KafkaUtil.enrich_consumer_config(config)
self.superstream.set_full_client_configs(full_config)
if self.superstream:
self.superstream.set_full_client_configs(full_config)

@property
def superstream(self) -> Superstream:
return self._superstream_config_.get(SuperstreamKeys.CONNECTION)

def wait_for_superstream_configs_sync(self, config: Dict[str, Any]) -> Dict[str, Any]:
if self.superstream:
return self.superstream.wait_for_superstream_configs_sync(config)
return config

def __update_topic_partitions(self, message):
superstream: Superstream = self._superstream_config_.get(SuperstreamKeys.CONNECTION)
if superstream is None:
if self.superstream is None:
return
topic = message.topic()
partition = message.partition()
superstream.update_topic_partitions(topic, partition)
self.superstream.update_topic_partitions(topic, partition)

def poll(self, message) -> Any:
if message is None:
Expand All @@ -49,7 +54,6 @@ def __intercept(self, message: Any) -> Any:
return message

async def __deserialize(self, message: Any) -> Any:
superstream: Superstream = self._superstream_config_.get(SuperstreamKeys.CONNECTION)
message_value = message.value()
headers = message.headers()

Expand All @@ -73,22 +77,22 @@ async def __deserialize(self, message: Any) -> Any:
check_interval = 5

for _ in range(0, wait_time, check_interval):
if superstream.superstream_ready:
if self.superstream and self.superstream.superstream_ready:
break
time.sleep(check_interval)

if not superstream.superstream_ready:
if not self.superstream or not self.superstream.superstream_ready:
sys.stderr.write(
"superstream: cannot connect with superstream and consume message that was modified by superstream"
)
return message

descriptor = superstream.consumer_proto_desc_map.get(schema_id)
descriptor = self.superstream.consumer_proto_desc_map.get(schema_id)
if not descriptor:
await superstream.send_get_schema_request(schema_id)
descriptor = superstream.consumer_proto_desc_map.get(schema_id)
await self.superstream.send_get_schema_request(schema_id)
descriptor = self.superstream.consumer_proto_desc_map.get(schema_id)
if not descriptor:
await superstream.handle_error(f"error getting schema with id: {schema_id}")
await self.superstream.handle_error(f"error getting schema with id: {schema_id}")
return message

try:
Expand All @@ -98,5 +102,5 @@ async def __deserialize(self, message: Any) -> Any:
message.set_value(deserialized_msg.encode("utf-8"))
return message
except Exception as e:
await superstream.handle_error(f"error deserializing data: {e!s}")
await self.superstream.handle_error(f"error deserializing data: {e!s}")
return message
8 changes: 7 additions & 1 deletion src/confluent_kafka/superstream/producer_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ def set_producer_handler(self, producer_handler: Callable):

def set_full_configuration(self, config: Dict[str, Any]):
full_config = KafkaUtil.enrich_producer_config(config)
self.superstream.set_full_client_configs(full_config)
if self.superstream:
self.superstream.set_full_client_configs(full_config)

def wait_for_superstream_configs_sync(self, config: Dict[str, Any]) -> Dict[str, Any]:
if self.superstream:
return self.superstream.wait_for_superstream_configs_sync(config)
return config

@property
def superstream(self) -> Superstream:
Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/superstream/superstream_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class SuperstreamConsumer(_ConsumerImpl):
def __init__(self, config: Dict):
self._interceptor = SuperstreamConsumerInterceptor(config)
config = self._interceptor.superstream.wait_for_superstream_configs_sync(config)
config = self._interceptor.wait_for_superstream_configs_sync(config)
self._interceptor.set_full_configuration(config)
super().__init__(config)

Expand Down
2 changes: 1 addition & 1 deletion src/confluent_kafka/superstream/superstream_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class SuperstreamProducer(_ProducerImpl):
def __init__(self, config: Dict):
self._interceptor = SuperstreamProducerInterceptor(config)
config = self._interceptor.superstream.wait_for_superstream_configs_sync(config)
config = self._interceptor.wait_for_superstream_configs_sync(config)
self._interceptor.set_full_configuration(config)
super().__init__(config)
self._interceptor.set_producer_handler(super().produce)
Expand Down
2 changes: 1 addition & 1 deletion version-beta.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.4.17
2.4.18
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.4.0
2.4.0.1

0 comments on commit 93c6525

Please sign in to comment.