"No one is harder on a talented person than the person themselves" - Linda Wilkinson ; "Trust your guts and don't follow the herd" ; "Validate direction not destination" ;

April 02, 2019

Day #232 - Kafka + Spark Integration - Big Data Setup - Part I

Experimenting with Kafka and Spark using Pyspark

Example 1 - Kafka Publish - Consume
#KAFKA Producer
#Function Definition
from kafka import KafkaConsumer, KafkaProducer
def connect_kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=['ip-XX-XX-XX-XX:9092'], api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
return _producer
#Kafka Publish message function
#Function Definition
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(key, encoding='utf-8')
value_bytes = bytes(value, encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print('Cab Booking Request published successfully.')
except Exception as ex:
print('Exception in publishing message')
print(str(ex))
#Producer and send messages for a topic
#Function Invocation
kafka_producer = connect_kafka_producer()
for i in range(1,10):
for j in range(1,10):
print(i)
print(j)
message = str(i) + ',' + str(j)
print(message)
publish_message(kafka_producer, 'cab_request', 'UberGo',message)
#Read messages from topic
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='ip-XX-XX-XX-XX:9092', auto_offset_reset='earliest')
consumer.subscribe(['cab_request'])
print(consumer.partitions_for_topic('cab_request'))
for message in consumer:
print (message)
Example 2 - Kafka Publish - Spark Consume
#KAFKA Producer
#Function Definition
from kafka import KafkaConsumer, KafkaProducer
def connect_kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=['ip-XX-XX-XX-XX:9092'], api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
return _producer
#Kafka Publish message function
#Function Definition
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(key, encoding='utf-8')
value_bytes = bytes(value, encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print('Cab Booking Request published successfully.')
except Exception as ex:
print('Exception in publishing message')
print(str(ex))
#Producer and send messages for a topic
#Function Invocation
kafka_producer = connect_kafka_producer()
for i in range(1,10):
for j in range(1,10):
print(i)
print(j)
message = 'Publish to spark from kafka' + str(i) + ',' + str(j)
print(message)
publish_message(kafka_producer, 'cab_request', 'UberGo',message)
import os
import time
import sys
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.streaming.kafka import TopicAndPartition
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.1 pyspark-shell'
conf = SparkConf().setAppName("Kafka-Spark")
sc = SparkContext.getOrCreate(conf)
#2 batches
stream=StreamingContext(sc,2)
print(sc.version)
kafkaBrokers = {"metadata.broker.list": "ip-XX-XX-XX-XX:9092"}
topic = "cab_request"
kafkastream = KafkaUtils.createDirectStream(stream, [topic],kafkaBrokers)
lines = kafkastream.map(lambda x: x[0])
messagedata = kafkastream.map(lambda x: x[1])
lines.pprint()
messagedata.pprint()
stream.start()
stream.awaitTermination()

Happy Learning!!!

No comments: