What Is ETL
ETL stands for Extract, Transform, Load. Extraction is the process by which data from many sources and formats is collected. The data is then processed to allow for ease of storing and future processing. This can include data cleaning, or format normalization into file structures such as JSON. From here the data can then be persisted for storage and access by interested stakeholders.
We are currently building an ETL Data processing pipeline for a project that involves raw data being extracted from multiple sources. The Data is then transformed to the required data format and cleaned. From here it can then be loaded into a data warehouse. It is here the Data scientist at the other end can then perform data analytics and mining to gain insights into the process. Developing this ETL pipeline has led to learning and utilising many interesting open source tools. This inspired us to further explore the potential of open source tooling for building pipelines.
We decided to set about implementing a streaming pipeline to process data in real-time. Due to the ever-increasing amount of data sources and data volumes being consumed, processing results in real-time is becoming more and more important. Due to the amount of data being generated by todays systems, stakeholders require the processed data faster and faster to gain insights as they happen in order to stay on top.
Providing the infrastructure between machine and data scientist opens the door to gaining valuable insights as they happen. This implementation has been developed using docker to allow for a distributed system which not only provides a decoupled, fault tolerant method of building a ETL pipeline, but allows for future developments to be integrated far easier. This blog post will detail how we developed a data processing pipeline using Open source technologies to process publicly available data (https://www.kaggle.com/c/new-york-city-taxi-fare-prediction/data). The code is available on our Github (https://github.com/akquinet/nyc-taxi-streaming) to play around with. You can even adapt the pipeline for your own purposes easily due to the distributed architecture of the pipeline!
The Data set used is the New York city taxi data. It can be found here → https://www.kaggle.com/c/new-york-city-taxi-fare-prediction/data. The New York city taxi fare prediction training set was used. The data is collected from all Taxi Fares in New York over a given period. The data points collected are:
|pickup_datetime||Timestamp of when the taxi ride began.|
|pickup_longitude||The longitude coordinate of where the taxi ride began.|
|pickup_latitude||The latitude coordinate of where the taxi ride began.|
|dropoff_longitude||The longitude coordinate of where the taxi ride ended.|
|dropoff_latitude||The latitude coordinate of where the taxi ride ended.|
|passenger_count||The number of passengers in the taxi ride.|
The goal we wanted to achieve was a fast pipeline that would take the raw data and filter out logged taxi trips that occurred outside of the New York area, and then to enrich the data by adding a Geo Point object to work with Kibanas mapping visualisation. The Pipeline needed to be fast and fault tolerant, while providing a decoupled architecture to allow for new modules to be added as needed. With these specifications in mind the following Architecture was decided upon.
Working with tools we were familiar with we decided to use Springboot as our application framework allowing us to easily work with Maven and Docker. The system would be built using Apache Camel, an EIP framework to provide the routing and integration of the various modules. Each module would use Kafka streaming as a Pub/Sub messaging service to send the data between the modules.
Various Streaming software is available such as Apache Spark, Apache Storm or Apache Flink. We decided to go with Apache Flink due to its ability to process data in real-time. Apache Flink (https://flink.apache.org/) is an open-source stream-processing framework developed by the Apache Software Foundation. The core of Apache Flink is a distributed streaming data-flow engine written in Java and Scala.
The persistence tool used was elastic search for the processed data, here Kibana was also linked to the system to provide a front end for quick dashboard development.
The whole system was Dockerized to provide ease of use between the different systems. All that was need was to run `docker-compose up` to boot up the system.
Camel: Message Producer
For the purpose of this pipeline batch data was made to mimic a stream of data, to achieve this the data was pre-sorted by the datetime column. Then it was stored in CSV format in a data folder within the application repository. Using a camel route, the data file was consumed from this folder and spilt into its individual lines. From here it was then streamed to the camel endpoint. In this case it is a Kafka Topic named raw Data. This is our extraction module of the processing pipeline where the data is consumed from its data source and converted to the desired format (in this case JSON) and then passed onto the next module for the transformation step.
from(inputFilePath).routeId("NYC Taxi Streamer") .streamCaching() .split(body().tokenize("\n")).streaming() .unmarshal().bindy(BindyType.Csv, TaxiRide.class) .marshal().json(JsonLibrary.Jackson, TaxiRide.class) .to(outputKafkaPath);
The Apache Flink implementation is then ran as a separate Springboot module within the maven project. Here we build our Flink environment, attach our producers and sinks to our Kafka implementation and create our mappers and filters for data cleaning and enhancing.
We begin by setting up the Flink environment and creating the Flink Kafka Consumer, the method by which Flink will connect to our Raw Taxi Kafka Topic. We then add our new Flink Kafka Connector as a DataStream source to the environment. We can see it consumes taxiRide Objects, a POJO we created to hold the incoming JSON Data.
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<TaxiRide> flinkKafkaConsumer = createStringConsumerForTopic(
inputTopic, address, groupId);
DataStream<TaxiRide> stringInputStream = environment.addSource(flinkKafkaConsumer);
Before we create or maps and filters we must also create a Flink Kafka Producer, this writes the outgoing messages to the desired Kafka topic to send to the next Pipeline module.
FlinkKafkaProducer<EnrichedRide> flinkKafkaProducer = createStringProducer(
The stringInputStream data is then passed to our filter, a simple filter that removes any data points outside the New York City area. From here we enrich our data, by creating a GeoPoint object containing a Geohash for each taxi ride, a data format we can use to plot the location of each pickup and drop off in real-time on a Kibana Map.
DataStream<TaxiRide> filterNYCRide = stringInputStream.filter(new NYCFilter());
DataStream<EnrichedRide> enrichedNYCRides = filterNYCRide.map(new Enrichment());
Finally, the data is passed to the Flink Kafka Producer as a sink for transfer to our specified Kafka Topic.
Processed JSON Data Hub
The data has now been Extracted and transformed in real-time to our desired format. It’s now time to load it into our desired Database. We pass the data to the Kafka topic CookedData where we then use camels’ endpoints to write to the database.
from(inputKafkaPath).routeId("Write to Elastic Search")
For this example, it was decided to use Elasticsearch coupled with Kibana. This provided an easily implemented, low latency database with a front end that could update in real-time as new data is loaded.
The code can be found on our Github at: https://github.com/akquinet/nyc-taxi-streaming
After Building the project using maven, all you need to do is run docker-compose up and head to the Kibana instance in your browser using `https://localhost:5601`. Here you can start playing around with the data using map graphics and dashboards.
Moving Towards Data Science
Now that the Data Source is being processed through the pipeline and the processed data is stored in the Database for real time analytics, we can utilise tools such as a Python Jupyter Notebook to perform data analysis with batches of Data for further exploration, data mining and machine learning.
How can I get this instance running locally?
- Git Clone the project you can find it on the Akquinet Git here: https://github.com/akquinet/nyc-taxi-streaming
- There are some requirements necessary to build the code from scratch:
- Java JDK1.8
- docker (docker agent must be running)
The buildsystem is based on maven. To build all the components just enter:
- $ mvn clean install
When the build is finished successfully you can startup your system.
In the root folder you will find a `docker-compose.yml` file from which all services will run
Startup the complete system
- docker-compose up
The Camel service will automatically create a ‘Data’ folder in the parent directory, this is where the data file should be placed for consumption.
Then navigating to the Kibana Instance `https://localhost:5601`in your browser. From here you can create visualisations to find insights into the processed data
Shutdown the complete system
- docker-compose down
In this post we described our method of building a ETL pipeline to enable real-time data analytics and further exploration on batched data. There are many open source tools out there to help build a robust pipeline, and we only scratched the surface. However this is a good start to understanding the wealth of data available to us, and gaining valuable insights to help further understand the problem at hand.