1. 程式人生 > >Amazon Kinesis Firehose Data Transformation with AWS Lambda

Amazon Kinesis Firehose Data Transformation with AWS Lambda


Shiva Narayanaswamy, Solution Architect

Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, or Amazon Elasticsearch Service (Amazon ES). You configure your data producers to send data to Firehose and it automatically delivers the data to the specified destination. You can send data to your delivery stream using the Amazon Kinesis Agent or the Firehose API, using the AWS SDK.

Customers have told us that they want to perform light preprocessing or mutation of the incoming data stream before writing it to the destination. Other use cases might include normalizing data produced by different producers, adding metadata to the record, or converting incoming data to a format suitable for the destination. At the moment, customers deliver data to an intermediate destination, such as a S3 bucket, and use S3 event notification to trigger a Lambda function to perform the transformation before delivering it to the final destination.

In this post, I introduce data transformation capabilities on your delivery streams, to seamlessly transform incoming source data and deliver the transformed data to your destinations.

Introducing Firehose Data Transformations

With the Firehose data transformation feature, you can now specify a Lambda function that can perform transformations directly on the stream, when you create a delivery stream.

When you enable Firehose data transformation, Firehose buffers incoming data and invokes the specified Lambda function with each buffered batch asynchronously. The transformed data is sent from Lambda to Firehose for buffering and then delivered to the destination. You can also choose to enable source record backup, which back up all untransformed records to your S3 bucket concurrently while delivering transformed records to the destination.

To get you started, we provide the following Lambda blueprints, which you can adapt to suit your needs:

  • Apache Log to JSON
  • Apache Log to CSV
  • Syslog to JSON
  • Syslog to CSV
  • General Firehose Processing

Setting up Firehose Data Transformation

In the Firehose console, create a new delivery stream with an existing S3 bucket as the destination.

alt text

In the Configuration section, enable data transformation, and choose the generic Firehose processing Lambda blueprint, which takes you to the Lambda console.

alt text

Edit the code inline, and paste the following Lambda function, which I'm using to demonstrate the Firehose data transformation feature. Choose a timeout of 5 minutes. This function matches the records in the incoming stream to a regular expression. On match, it parses the JSON record. The function then does the following:

  • Picks only the RETAIL sector and drops the rest (filtering)
  • Adds a TIMESTAMP to the record (mutation)
  • Converts from JSON to CSV (transformation)
  • Passes the processed record back into the stream for delivery
'use strict';
console.log('Loading function');

/* Stock Ticker format parser */
const parser = /^\{\"TICKER_SYMBOL\"\:\"[A-Z]+\"\,\"SECTOR\"\:"[A-Z]+\"\,\"CHANGE\"\:[-.0-9]+\,\"PRICE\"\:[-.0-9]+\}/;

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found
    let dropped = 0; // Number of dropped entries 

    /* Process the list of records and transform them */
    const output = event.records.map((record) => {

        const entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let match = parser.exec(entry);
        if (match) {
            let parsed_match = JSON.parse(match); 
            var milliseconds = new Date().getTime();
            /* Add timestamp and convert to CSV */
            const result = `${milliseconds},${parsed_match.TICKER_SYMBOL},${parsed_match.SECTOR},${parsed_match.CHANGE},${parsed_match.PRICE}`+"\n";
            const payload = (new Buffer(result, 'utf8')).toString('base64');
            if (parsed_match.SECTOR != 'RETAIL') {
                /* Dropped event, notify and leave the record intact */
                dropped++;
                return {
                    recordId: record.recordId,
                    result: 'Dropped',
                    data: record.data,
                };
            }
            else {
                /* Transformed event */
                success++;  
                return {
                    recordId: record.recordId,
                    result: 'Ok',
                    data: payload,
                };
            }
        }
        else {
            /* Failed event, notify the error and leave the record intact */
            console.log("Failed event : "+ record.data);
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
        /* This transformation is the "identity" transformation, the data is left intact 
        return {
            recordId: record.recordId,
            result: 'Ok',
            data: record.data,
        } */
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

In the Firehose console, choose the newly created Lambda function. Enable source record backup, and choose the same S3 bucket and an appropriate prefix. Firehose delivers the raw data stream to this bucket under this prefix.

alt text

Choose a S3 buffer size of 1 MB, and a buffer interval of 60 seconds. Create a Firehose Delivery IAM role.

alt text

Review the configuration and create the Firehose delivery stream.

Testing Firehose Data Transformation

You can use the AWS Management Console to ingest simulated stock ticker data. The console runs a script in your browser to put sample records in your Firehose delivery stream. This enables you to test the configuration of your delivery stream without having to generate your own test data. The following is an example from the simulated data:

{"TICKER_SYMBOL":"QXZ","SECTOR":"HEALTHCARE","CHANGE":-0.05,"PRICE":84.51}
{"TICKER_SYMBOL":"TGT","SECTOR":"RETAIL","CHANGE":2.14,"PRICE":68.26}

To test the Firehose data transformation, the Lambda function created in the previous section adds a timestamp to the records, and delivers only the stocks from the “RETAIL” sector. This test demonstrates the ability to add metadata to the records in the incoming stream, and also filtering the delivery stream.

Choose the newly created Firehose delivery stream, and choose Test with demo data, Start sending demo data.

alt text

Firehose provides CloudWatch metrics about the delivery stream. Additional metrics to monitor the data processing feature are also now available.

alt text

The destination S3 bucket does not contain the prefixes with the source data backup, and the processed stream. Download a file of the processed data, and verify that the records contain the timestamp and the “RETAIL” sector data, as follows:

1483504691599,ABC,RETAIL,0.92,21.28
1483504691600,TGT,RETAIL,-1.2,61.89
1483504691600,BFH,RETAIL,-0.79,15.86
1483504691600,MJN,RETAIL,-0.27,129.37
1483504691600,WMT,RETAIL,-2.4,76.39

Conclusion

With the Firehose data transformation feature, you now have a powerful, scalable way to perform data transformations on streaming data. You can create a data lake with the raw data, and simultaneously transform data to be consumed in a suitable format by a Firehose destination.

If you have any questions or suggestions, please comment below.

相關推薦

Amazon Kinesis Firehose Data Transformation with AWS Lambda

Shiva Narayanaswamy, Solution Architect Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to des

Code Evaluation With AWS Lambda and API Gateway

This tutorial details how AWS Lambda and API Gateway can be used to develop a simple code evaluation API, where an end user submits code, via an AJAX fo

Quickly build, test, and deploy your data lake with AWS and partner solutions

Performing data science workloads on data from disparate sources – data lake, data warehouse, streaming, and more – creates challenges f

Robust Serverless Application Design with AWS Lambda Dead Letter Queues

Gene Ting, Solutions Architect AWS Lambda is a serverless, event-driven compute service that allows developers to bring their functions t

Addressing Data Residency with AWS

AWS has released a new whitepaper that has been requested by many AWS customers: AWS Policy Perspectives: Data Residency. Data residency

Extend AWS DeepLens to Send SMS Notifications with AWS Lambda

AWS DeepLens is a deep learning enabled developer toolkit with a video camera. It enables you to develop machine learning skills using hands-on co

time bushfire alerting with Complex Event Processing in Apache Flink on Amazon EMR and IoT sensor network | AWS Big Data Blog

Bushfires are frequent events in the warmer months of the year when the climate is hot and dry. Countries like Australia and the United States are

Predictive Data Science with Amazon SageMaker and a Data Lake on AWS

This Quick Start builds a data lake environment for building, training, and deploying machine learning (ML) models with Amazon SageMaker on the Am

Amazon Kinesis Data Firehose blog posts

Stream data into an Aurora PostgreSQL Database using AWS DMS and Amazon Kinesis Data Firehose In this blog post, we explore a solution to

Building a Data Processing Pipeline with Amazon Kinesis Data Streams and Kubeless

If you’re already running Kubernetes, FaaS (Functions as a Service) platforms on Kubernetes can help you leverage your existing investment in EC2

Amazon Kinesis Data Firehose Features

You can configure Amazon Kinesis Data Firehose to prepare your streaming data before it is loaded to data stores. Simply select an AWS Lambda fun

Troubleshoot Issues with SES Publishing Data to Kinesis Firehose

Here are some reasons why Amazon SES might not publish data to Amazon Kinesis Firehose: The delivery stream was deleted S

Amazon Kinesis Data Streams:AWS

Amazon Kinesis Data Streams (KDS) は、大規模にスケーラブルで持続的なリアルタイムのデータストリーミングサービスです。KDS はウエブサイトクリックストリームやデータべースイベントストリームや金融取引、ソーシャルメディアフィード、ITロゴ、ロケーション追跡イベ

Amazon Kinesis Data Firehose Pricing

If you send 5,000 records of streaming data per second, each record 7KB in size, to Amazon Kinesis Data Firehose in US-East to be loaded into Amaz

Amazon Kinesis Data Firehose Resources

Reducing the time to get actionable insights from data is important to all businesses and customers who employ batch data analytics tools are ex

Amazon Redshift Data Warehouse with Matillion ETL on AWS

AWS offers a common architectural structure that enables you to leverage new and existing big data technologies and data warehouse methods. Throug

Amazon Kinesis Data Firehose 定價

如果您每秒向美國東部的 Amazon Kinesis Data Firehose 傳送 5000 條流資料記錄(且每條記錄的大小為 7KB),並且隨後將這些資料記錄載入到 Amazon S3 中,同時啟用了將資料格式轉換為 Apache Parquet 的功能,則您的月度費用的計算方式如下:

論文閱讀筆記《The Contextual Loss for Image Transformation with Non-Aligned Data》(ECCV2018 oral)

github 區域 偏移 org nbsp 修改 transfer style 但是 目錄: 相關鏈接 方法亮點 相關工作 方法細節 實驗結果 總結與收獲 相關鏈接 論文:https://arxiv.org/abs/1803.02077 代碼:https://

Managing a Spotify Library with Go and AWS Lambda

Managing a Spotify Library with Go and AWS LambdaSpotify exposes a robust API that can be used to manage your (or someone elses) music library and do all s