CDP part 6: end-to-end data lakehouse ingestion pipeline with CDP

CDP part 6: end-to-end data lakehouse ingestion pipeline with CDP

In this hands-on lab session we demonstrate how to build an end-to-end big data solution with Cloudera Data Platform (CDP) Public Cloud, using the infrastructure we have deployed and configured over the course of the series.

This is the final article in a series of six:

Our objective is to provide a self-service data analytics solution that enables end-users to analyze stock data. On a high level, this encompasses the following steps:

  1. Configure automatic data ingestion from a public API to our S3 data lake.
  2. Provide an ELT solution to move data from our Data Lake to the Data Warehouse.
  3. Create a dashboard to visualize the data stored in the warehouse.

Preparation: Get Alpha Vantage API Key

Before we begin with our data ingestion, we need to get access to an API. For our purposes, we are going to use a free stock market API by Alpha Vantage:

  1. Navigate to Alpha Vantage




    Alpha Vantage Free API

  2. Click on: Get Your Free Api Key Today

  3. Fill the information to claim the key:

    1. Student
    2. School/organization name
    3. Valid email address
  4. Click on GET FREE API KEY




    Claim your free API key

  5. Take note of your key () as you will need it later.

Access your CDP Public Cloud Portal

To get started, you need to have prepared a user account on your CDP infrastructure as described in our previous articles CDP part 4: user management on CDP Public Cloud with Keycloak and CDP part 5: user permission management on CDP Public Cloud.

  1. Please login via your custom login page with a user you created for this exercise.




    Keycloak example login page

  2. After login, you are redirected to the CDP console.




    CDP Console

Note that if you did not configure your Keycloak instance to use SSL/TLS, you may see a non-secure site warning at this step.

Set your Workload Password

After the first login with your CDP user, you are required to set a workload password. This enables you to perform tasks using CDP services.

  1. Click on your name on the button left corner and click on Profile




    Access your profile

  2. Click on Set Workload Password




    Set a workload password

  3. If you successfully set your password, you see the message ( Workload password is currently set ) on your profile.

Note: You may reset your password later in case you lose it.

Data Ingestion: Set up a DataFlow

We are using CDP’s DataFlow service to ingest data from our API to our Data Lake. Remember that DataFlow is powered by Apache NiFi.

Import a Flow definition

  1. Navigate to the CDP portal and select the DataFlow icon




    Access Data Flow

  2. On the left menu click on Catalog and then on Import Flow Definition




    Import a Flow definition

  3. Import the NiFi Flow and fill up the parameters as follow:

    • Flow name: _stock_data
    • Flow description:
    • Import: NiFi Flow
    • Click on Import




    Import a Flow definition

Deploy a Nifi Flow

  1. Click on the flow definition created in the previous step

  2. Click on Deploy




    Deploy a Nifi flow

  3. Select your existing CDP Public Cloud environment as Target Environment

  4. Click on Continue




    Create a new deployment

  5. Set the Deployment Name: _stock_data




    Set a deployment name

  6. Do not modify the NiFi Configuration tab, click on Next




    Configure a deployment

  7. In the Parameters tab, set:

    • api_alpha_key:
    • s3_path: stocks
    • stock_list: default
    • workload_password:
    • workload_username:




    Configure deployment parameters

  8. In the Sizing & Scaling tab, set:

    • NiFi Node Sizing: Extra Small
    • Auto Scaling: Disabled
    • Nodes: 1




    Configure scaling

  9. In the Key Performance Indicators, make no modifications and click on next




    Skip the KPI configuration

  10. Review your configuration, then click on Deploy




    Review and deploy

This last step launches the NiFi flow. It should take a few minutes until the flow is up and running. You may check the progress on the Dashboard tab in the CDF page.

View your NiFi flow

It is possible to check and review the flow in the web interface once it is up and running:

  1. Click on the blue arrow on the right of your deployed flow




    Data Flow Overview

  2. Click on Manage Deployment top right corner




    Manage Deployment button

  3. In the Deployment Manager, click on Actions and then on View in NiFi




    View Nifi

  4. This opens another browser tab with the NiFi flow




    NiFi flow

  5. Take a few minutes to explore and understand the different components of the flow

  6. As there is no need to continuously ingest data in order to continue with the lab, go back to Deployment Manager, Actions, and click on Suspend flow

Analytical Storage: Data Warehouse

Our next step is to transfer our raw data from the Data Lake to an analytical store. We chose an Apache Iceberg table for this purpose, a modern data format with many advantages. Now we are going to create the Iceberg table.

Create an Iceberg table

From the CDP Portal:

  1. Select Data Warehouse




    Navigate to Data Warehouse

  2. Click the HUE button on the top right corner, this will open the HUE Editor




    Hue Button




    Hue Editor

  3. Create a database using your

    CREATE DATABASE <username>_stocks;




    DB Creation with Hue

  4. Create an Iceberg table stock_intraday_1min on the database created in the previous step:

    CREATE TABLE IF NOT EXISTS <username>_stocks.stock_intraday_1min (
      interv STRING
      , output_size STRING
      , time_zone STRING
      , open DECIMAL(8,4)
      , high DECIMAL(8,4)
      , low DECIMAL(8,4)
      , close DECIMAL(8,4)
      , volume BIGINT)
    
    PARTITIONED BY (
      ticker STRING
      , last_refreshed string
      , refreshed_at string)
    
    STORED AS iceberg;




    Iceberg table creation

  5. Perform a SELECT to verify that the required permissions have been set

    SELECT * FROM <username>_stocks.stock_intraday_1min;




    Selecting from an Iceberg table

Create a pipeline to load data

Now that our Iceberg table is ready and our data is loaded to the data lake, we need to create a pipeline. This pipeline needs to detect new files in our data lake and load their content into the Iceberg table. The service we use for this purpose is Data Engineering which, as we may remember, is built on Apache Spark.

From the CDP Portal:

  1. Download this .jar file with a pre-compiled Apache Spark job: stockdatabase_2.12-1.0.jar

  2. Select Data Engineering




    Select Data Engineering

  3. On the Virtual Cluster available click the View Jobs button on the top right corner




    View Jobs

  4. Navigate to the Jobs tab and click on Create a Job




    Create a Job

  5. Set the Job details:

    • Job type: Spark 3.2.0
    • Name: _StockIceberg
    • Application File: Upload
    • Main Class: com.cloudera.cde.stocks.StockProcessIceberg
    • Arguments:




    Upload resource




    Job details

  6. Click on Create and Run




    Create and run

  7. Navigate to Jobs and select the job created above to check the status.




    View Job status

This application does the following:

  • Check new files in the new directory
  • Create a temp table in Spark and identifies duplicated rows (in case that NiFi loaded the same data again)
  • MERGE INTO the final table, INSERT new data or UPDATE if exists
  • Archive files in the bucket
  • After execution, the processed files remain in your S3 bucket but are moved into the processed-data directory

Serving Layer: A Dashboard in CDP Data Visualization

The final step in our end-to-end solution is to create the self-service solution. For this, we use the built-in Data Visualization feature of the Data Warehouse service.

Create a dataset

  1. Navigate back to the Cloudera Data Warehouse

  2. On the left menu choose: Data Visualization and click the Data VIZ button on the right.




    Data Viz

  3. On the top of the screen click on DATA




    DATA

  4. On the left select the dwh-impala-connection connection




    Impala connection

  5. Click on NEW DATASET and set:

    • Dataset title: _dataset
    • Dataset Source: From Table
    • Select Database: _stocks
    • Select Table: stocks_intraday_1min
    • Create




    New dataset

Create a dashboard

  1. Click on New Dashboard




    New Dashboard

  2. Wait a few seconds until you get the following




    New Dashboard

  3. On the Visuals tab drag:

    • Dimensions: ticker
    • Measure: volume
    • REFRESH VISUAL
    • Visuals -> Packed Bubbles




    Data visualization




    Data visualization

  4. Save the Dashboard and make it public

    1. Enter a title: Dashboard
    2. Navigate to the top left corner click on Save
    3. Change: Private -> Public
    4. Click Move




    Public Dashboard

And that is it! You have now created an end-to-end big data solution with CDP Public Cloud. Finally, let’s track an additional stock and have it outputted in Data Warehouse.

Iceberg snapshots

Let’s see the Iceberg table history

  1. Go back to the Hue Editor

  2. Execute and take note of the

    DESCRIBE HISTORY <username>_stocks.stock_intraday_1min;




    Iceberg table history

  3. Execute the following Impala query:

    SELECT count(*), ticker
    FROM <username>_stocks.stock_intraday_1min
    FOR SYSTEM_VERSION AS OF <snapshot_id>
    GROUP BY ticker;




    Impala query

Adding a new stock

  1. Go back to the Deployment Manager of your NiFi Flow (See Step 6)

  2. Select Parameters




    Flow paramenters

  3. Add on the parameter stock_list the stock NVDA (NVIDIA), and click on Apply Changes




    Add Stock

  4. Once the changes are applied, click on Actions, Start flow

Re-run the Spark Job

  1. Go back to the Data Engineering service, tab Jobs

  2. Click on the 3 dots of your job and click on Run now




    Re-run Spark job

Check new snapshot history

  1. Go back to the Hue Editor

  2. Check the snap-history again and take note of the new

    DESCRIBE HISTORY <username>_stocks.stock_intraday_1min;
  3. Check the new snapshot data




    Updated History

  4. Execute a SELECT using the new to see the new stock data added




    Select from updated history

  5. Run this query without snapshot, to get all values from all parent and child snapshots

    SELECT *
    FROM <username>_stocks.stock_intraday_1min
  6. To check the files in the S3 bucket:

    SHOW FILES IN <username>_stocks.stock_intraday_1min




    dwh iceberg 10 show files

Play around with the visuals

Go back to the Data Visualizations and explore the different options available for the dashboard




Updated visuals 1




Updated visuals 2




Updted visuals 3

This series covered all tasks to set up a data engineering pipeline solution from square one. We started deploying the CDP Public Cloud infrastructure using AWS resources, configured Keycloak for user authentication on that same cluster, managed user permissions and finally constructed a pipleline using the different CDP services. There are some advanced features you may experiment with if you are so inclined. That said, remember that resources created on AWS are not free and that you will incur some costs while your infrastructure is active. Remember to release all your AWS resources when you are done with the lab to avoid unwanted costs.

Leave a Reply

Your email address will not be published. Required fields are marked *