Example 1 - Kafka Publish - Consume
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#KAFKA Producer | |
#Function Definition | |
from kafka import KafkaConsumer, KafkaProducer | |
def connect_kafka_producer(): | |
_producer = None | |
try: | |
_producer = KafkaProducer(bootstrap_servers=['ip-XX-XX-XX-XX:9092'], api_version=(0, 10)) | |
except Exception as ex: | |
print('Exception while connecting Kafka') | |
print(str(ex)) | |
finally: | |
return _producer | |
#Kafka Publish message function | |
#Function Definition | |
def publish_message(producer_instance, topic_name, key, value): | |
try: | |
key_bytes = bytes(key, encoding='utf-8') | |
value_bytes = bytes(value, encoding='utf-8') | |
producer_instance.send(topic_name, key=key_bytes, value=value_bytes) | |
producer_instance.flush() | |
print('Cab Booking Request published successfully.') | |
except Exception as ex: | |
print('Exception in publishing message') | |
print(str(ex)) | |
#Producer and send messages for a topic | |
#Function Invocation | |
kafka_producer = connect_kafka_producer() | |
for i in range(1,10): | |
for j in range(1,10): | |
print(i) | |
print(j) | |
message = str(i) + ',' + str(j) | |
print(message) | |
publish_message(kafka_producer, 'cab_request', 'UberGo',message) | |
#Read messages from topic | |
from kafka import KafkaConsumer | |
consumer = KafkaConsumer(bootstrap_servers='ip-XX-XX-XX-XX:9092', auto_offset_reset='earliest') | |
consumer.subscribe(['cab_request']) | |
print(consumer.partitions_for_topic('cab_request')) | |
for message in consumer: | |
print (message) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#KAFKA Producer | |
#Function Definition | |
from kafka import KafkaConsumer, KafkaProducer | |
def connect_kafka_producer(): | |
_producer = None | |
try: | |
_producer = KafkaProducer(bootstrap_servers=['ip-XX-XX-XX-XX:9092'], api_version=(0, 10)) | |
except Exception as ex: | |
print('Exception while connecting Kafka') | |
print(str(ex)) | |
finally: | |
return _producer | |
#Kafka Publish message function | |
#Function Definition | |
def publish_message(producer_instance, topic_name, key, value): | |
try: | |
key_bytes = bytes(key, encoding='utf-8') | |
value_bytes = bytes(value, encoding='utf-8') | |
producer_instance.send(topic_name, key=key_bytes, value=value_bytes) | |
producer_instance.flush() | |
print('Cab Booking Request published successfully.') | |
except Exception as ex: | |
print('Exception in publishing message') | |
print(str(ex)) | |
#Producer and send messages for a topic | |
#Function Invocation | |
kafka_producer = connect_kafka_producer() | |
for i in range(1,10): | |
for j in range(1,10): | |
print(i) | |
print(j) | |
message = 'Publish to spark from kafka' + str(i) + ',' + str(j) | |
print(message) | |
publish_message(kafka_producer, 'cab_request', 'UberGo',message) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import sys | |
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6' | |
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6' | |
from pyspark import SparkContext, SparkConf | |
from pyspark.streaming import StreamingContext | |
from pyspark.streaming.kafka import KafkaUtils | |
from pyspark.sql import SparkSession | |
from pyspark.streaming.kafka import TopicAndPartition | |
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.1 pyspark-shell' | |
conf = SparkConf().setAppName("Kafka-Spark") | |
sc = SparkContext.getOrCreate(conf) | |
#2 batches | |
stream=StreamingContext(sc,2) | |
print(sc.version) | |
kafkaBrokers = {"metadata.broker.list": "ip-XX-XX-XX-XX:9092"} | |
topic = "cab_request" | |
kafkastream = KafkaUtils.createDirectStream(stream, [topic],kafkaBrokers) | |
lines = kafkastream.map(lambda x: x[0]) | |
messagedata = kafkastream.map(lambda x: x[1]) | |
lines.pprint() | |
messagedata.pprint() | |
stream.start() | |
stream.awaitTermination() |
Happy Learning!!!
No comments:
Post a Comment