Skip to content

Commit

Permalink
Merge pull request #55389 from Nate-Watts/port-50579
Browse files Browse the repository at this point in the history
Port 50579
  • Loading branch information
dwoz authored Jan 2, 2020
2 parents 89d97b2 + bfedbef commit bbc6d22
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions salt/returners/kafka_return.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,68 @@
'''
Return data to a Kafka topic
:maintainer: Christer Edwards (christer.edwards@gmail.com)
:maturity: 0.1
:depends: kafka-python
:maintainer: Justin Desilets (justin.desilets@gmail.com)
:maturity: 20181119
:depends: confluent-kafka
:platform: all
To enable this returner install kafka-python and enable the following settings
in the minion config:
To enable this returner install confluent-kafka and enable the following
settings in the minion config:
returner.kafka.hostnames:
- "server1"
- "server2"
- "server3"
returner.kafka.bootstrap:
- "server1:9092"
- "server2:9092"
- "server3:9092"
returner.kafka.topic: 'topic'
To use the kafka returner, append '--return kafka' to the Salt command, eg;
To use the kafka returner, append `--return kafka` to the Salt command, eg;
salt '*' test.ping --return kafka
'''

# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import logging
import salt.utils.json

# Import third-party libs
try:
from kafka import KafkaClient, SimpleProducer
from confluent_kafka import Producer
HAS_KAFKA = True
except ImportError:
HAS_KAFKA = False

log = logging.getLogger(__name__)


__virtualname__ = 'kafka'


def __virtual__():
if not HAS_KAFKA:
return False, 'Could not import kafka returner; kafka-python is not installed.'
return False, 'Could not import kafka returner; confluent-kafka is not installed.'
return __virtualname__


def _get_conn(ret=None):
def _get_conn():
'''
Return a kafka connection
'''
if __salt__['config.option']('returner.kafka.hostnames'):
hostnames = __salt__['config.option']('returner.kafka.hostnames')
return KafkaClient(hostnames)
if __salt__['config.option']('returner.kafka.bootstrap'):
bootstrap = ','.join(__salt__['config.option']('returner.kafka.bootstrap'))
else:
log.error('Unable to find kafka returner config option: hostnames')
log.error('Unable to find kafka returner config option: bootstrap')
return None
return bootstrap


def _close_conn(conn):
'''
Close the kafka connection
'''
conn.close()
def _delivery_report(err, msg):
''' Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). '''
if err is not None:
log.error('Message delivery failed: %s', err)
else:
log.debug('Message delivered to %s [%s]', msg.topic(), msg.partition())


def returner(ret):
Expand All @@ -72,10 +74,11 @@ def returner(ret):
if __salt__['config.option']('returner.kafka.topic'):
topic = __salt__['config.option']('returner.kafka.topic')

conn = _get_conn(ret)
producer = SimpleProducer(conn)
producer.send_messages(topic, salt.utils.json.dumps(ret))
conn = _get_conn()
producer = Producer({'bootstrap.servers': conn})
producer.poll(0)
producer.produce(topic, salt.utils.json.dumps(ret), str(ret).encode('utf-8'), callback=_delivery_report)

_close_conn(conn)
producer.flush()
else:
log.error('Unable to find kafka returner config option: topic')

0 comments on commit bbc6d22

Please sign in to comment.