Lab: Data Streaming Pipeline with Kafka for livetolldata
Objective
In this lab, we will first create a toll database in MySQL, then create a table schema. We will then start the Kafka server and then stream toll data events using Kafka producer. On the other side, we will receive the streaming events using Kafka consumer and write into MySQL database, into the table that we created.
Introduction
You are a data engineer at a data analytics consulting company. You have been assigned to a project that aims to de-congest the national highways by analyzing the road traffic data from different toll plazas. As a vehicle passes a toll plaza, the vehicle’s data like vehicle_id
, vehicle_type
, toll_plaza_id
and timestamp
are streamed to Kafka. Your job is to create a data pipe line that collects the streaming data and loads it into a database.
In this assignment you will create a streaming data pipe by performing these steps:
- Start a MySQL Database server.
- Create a table to hold the toll data.
- Start the Kafka server.
- Install the Kafka python driver.
- Install the MySQL python driver.
- Create a topic named toll in kafka.
- Download streaming data generator program.
- Customize the generator program to steam to toll topic.
- Download and customise streaming data consumer.
- Customize the consumer program to write into a MySQL database table.
- Verify that streamed data is being collected in the database table.
Prepare the lab environment
Before you start the assignment, complete the following steps to set up the lab:
# Download Kafka
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
# Extract Kafka
tar -xzf kafka_2.12-2.8.0.tgz
# Start MySQL server
start_mysql
# Connect to the mysql server
mysql --host=<hostname> --port=3306 --user=<username> --password
# Create a database named `tolldata`
# At the 'mysql>' prompt, run the command below to create the database.
create database tolldata;
# Create a table named `livetolldata` with the schema to store the data generated by the traffic simulator
use tolldata;
create table livetolldata(timestamp datetime,vehicle_id int,vehicle_type char(15),toll_plaza_id smallint);
# Disconnect from MySQL server
exit
# Install the python module `kafka-python` using the pip command
pip install kafka-python
# Install the python module `mysql-connector-python` using the pip command
pip install mysql-connector-python
livetolldata
is the table where you would store all the streamed data that comes from kafka. Each row is a record of when a vehicle has passed through a certain toll plaza along with its type and anonymized id.
Assignment
Task 2.1 - Start Zookeeper
Start zookeeper server.
Take a screenshot of the command you run.
Name the screenshot start_zookeeper.jpg
. (Images can be saved with either the .jpg or .png extension.)
Task 2.2 - Start Kafka server
Start Kafka server
Take a screenshot of the command you run.
Name the screenshot start_kafka.jpg
. (Images can be saved with either the .jpg or .png extension.)
Task 2.3 - Create a topic named toll
Create a Kakfa topic named toll
Take a screenshot of the command you run.
Name the screenshot create_toll_topic.jpg
. (Images can be saved with either the .jpg or .png extension.)
Task 2.4 - Configure the Toll Traffic Simulator
Open the toll_traffic_generator.py
and set the topic to toll
.
Take a screenshot of the task code with the topic clearly visible.
Name the screenshot configure_simulator.jpg
. (Images can be saved with either the .jpg or .png extension.)
Task 2.5 - Run the Toll Traffic Simulator
Run the toll_traffic_generator.py
.
Take a screenshot of the output of the simulator.
Name the screenshot simulator_output.jpg
. (Images can be saved with either the .jpg or .png extension.)
Task 2.6 - Configure the Streaming Data Reader
Open the streaming_data_reader.py
and modify the following details so that the program can connect to your mysql server.
TOPIC
DATABASE
USERNAME
PASSWORD
Take a screenshot of the code you modified.
Name the screenshot streaming_reader_code.jpg
. (Images can be saved with either the .jpg or .png extension.)
Task 2.7 - Run the Streaming Data Reader
Run the streaming_data_reader.py
Take a screenshot of the output of the streaming_data_reader.py.
Name the screenshot data_reader_output.jpg
. (Images can be saved with either the .jpg or .png extension.)
Task 2.8 - Health check of the streaming data pipeline.
If you have done all the steps till here correctly, the streaming toll data would get stored in the table livetolldata
.
List the top 10 rows in the table livetolldata
.
Take a screenshot of the command and the output.
Name the screenshot output_rows.jpg
. (Images can be saved with either the .jpg or .png extension.)
This concludes the assignment.