Amazon Kinesis Firehose Data Transformation with AWS Lambda
Shiva Narayanaswamy, Solution Architect
Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, or Amazon Elasticsearch Service (Amazon ES). You configure your data producers to send data to Firehose and it automatically delivers the data to the specified destination. You can send data to your delivery stream using the Amazon Kinesis Agent or the Firehose API, using the AWS SDK.
Customers have told us that they want to perform light preprocessing or mutation of the incoming data stream before writing it to the destination. Other use cases might include normalizing data produced by different producers, adding metadata to the record, or converting incoming data to a format suitable for the destination. At the moment, customers deliver data to an intermediate destination, such as a S3 bucket, and use S3 event notification to trigger a Lambda function to perform the transformation before delivering it to the final destination.
In this post, I introduce data transformation capabilities on your delivery streams, to seamlessly transform incoming source data and deliver the transformed data to your destinations.
Introducing Firehose Data Transformations
With the Firehose data transformation feature, you can now specify a Lambda function that can perform transformations directly on the stream, when you create a delivery stream.
When you enable Firehose data transformation, Firehose buffers incoming data and invokes the specified Lambda function with each buffered batch asynchronously. The transformed data is sent from Lambda to Firehose for buffering and then delivered to the destination. You can also choose to enable source record backup, which back up all untransformed records to your S3 bucket concurrently while delivering transformed records to the destination.
To get you started, we provide the following Lambda blueprints, which you can adapt to suit your needs:
- Apache Log to JSON
- Apache Log to CSV
- Syslog to JSON
- Syslog to CSV
- General Firehose Processing
Setting up Firehose Data Transformation
In the Firehose console, create a new delivery stream with an existing S3 bucket as the destination.
In the Configuration section, enable data transformation, and choose the generic Firehose processing Lambda blueprint, which takes you to the Lambda console.
Edit the code inline, and paste the following Lambda function, which I'm using to demonstrate the Firehose data transformation feature. Choose a timeout of 5 minutes. This function matches the records in the incoming stream to a regular expression. On match, it parses the JSON record. The function then does the following:
- Picks only the RETAIL sector and drops the rest (filtering)
- Adds a TIMESTAMP to the record (mutation)
- Converts from JSON to CSV (transformation)
- Passes the processed record back into the stream for delivery
'use strict';
console.log('Loading function');
/* Stock Ticker format parser */
const parser = /^\{\"TICKER_SYMBOL\"\:\"[A-Z]+\"\,\"SECTOR\"\:"[A-Z]+\"\,\"CHANGE\"\:[-.0-9]+\,\"PRICE\"\:[-.0-9]+\}/;
exports.handler = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
let dropped = 0; // Number of dropped entries
/* Process the list of records and transform them */
const output = event.records.map((record) => {
const entry = (new Buffer(record.data, 'base64')).toString('utf8');
let match = parser.exec(entry);
if (match) {
let parsed_match = JSON.parse(match);
var milliseconds = new Date().getTime();
/* Add timestamp and convert to CSV */
const result = `${milliseconds},${parsed_match.TICKER_SYMBOL},${parsed_match.SECTOR},${parsed_match.CHANGE},${parsed_match.PRICE}`+"\n";
const payload = (new Buffer(result, 'utf8')).toString('base64');
if (parsed_match.SECTOR != 'RETAIL') {
/* Dropped event, notify and leave the record intact */
dropped++;
return {
recordId: record.recordId,
result: 'Dropped',
data: record.data,
};
}
else {
/* Transformed event */
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
}
}
else {
/* Failed event, notify the error and leave the record intact */
console.log("Failed event : "+ record.data);
failure++;
return {
recordId: record.recordId,
result: 'ProcessingFailed',
data: record.data,
};
}
/* This transformation is the "identity" transformation, the data is left intact
return {
recordId: record.recordId,
result: 'Ok',
data: record.data,
} */
});
console.log(`Processing completed. Successful records ${output.length}.`);
callback(null, { records: output });
};
In the Firehose console, choose the newly created Lambda function. Enable source record backup, and choose the same S3 bucket and an appropriate prefix. Firehose delivers the raw data stream to this bucket under this prefix.
Choose a S3 buffer size of 1 MB, and a buffer interval of 60 seconds. Create a Firehose Delivery IAM role.
Review the configuration and create the Firehose delivery stream.
Testing Firehose Data Transformation
You can use the AWS Management Console to ingest simulated stock ticker data. The console runs a script in your browser to put sample records in your Firehose delivery stream. This enables you to test the configuration of your delivery stream without having to generate your own test data. The following is an example from the simulated data:
{"TICKER_SYMBOL":"QXZ","SECTOR":"HEALTHCARE","CHANGE":-0.05,"PRICE":84.51}
{"TICKER_SYMBOL":"TGT","SECTOR":"RETAIL","CHANGE":2.14,"PRICE":68.26}
To test the Firehose data transformation, the Lambda function created in the previous section adds a timestamp to the records, and delivers only the stocks from the “RETAIL” sector. This test demonstrates the ability to add metadata to the records in the incoming stream, and also filtering the delivery stream.
Choose the newly created Firehose delivery stream, and choose Test with demo data, Start sending demo data.
Firehose provides CloudWatch metrics about the delivery stream. Additional metrics to monitor the data processing feature are also now available.
The destination S3 bucket does not contain the prefixes with the source data backup, and the processed stream. Download a file of the processed data, and verify that the records contain the timestamp and the “RETAIL” sector data, as follows:
1483504691599,ABC,RETAIL,0.92,21.28
1483504691600,TGT,RETAIL,-1.2,61.89
1483504691600,BFH,RETAIL,-0.79,15.86
1483504691600,MJN,RETAIL,-0.27,129.37
1483504691600,WMT,RETAIL,-2.4,76.39
Conclusion
With the Firehose data transformation feature, you now have a powerful, scalable way to perform data transformations on streaming data. You can create a data lake with the raw data, and simultaneously transform data to be consumed in a suitable format by a Firehose destination.
If you have any questions or suggestions, please comment below.
相關推薦
Amazon Kinesis Firehose Data Transformation with AWS Lambda
Shiva Narayanaswamy, Solution Architect Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to des
Code Evaluation With AWS Lambda and API Gateway
This tutorial details how AWS Lambda and API Gateway can be used to develop a simple code evaluation API, where an end user submits code, via an AJAX fo
Quickly build, test, and deploy your data lake with AWS and partner solutions
Performing data science workloads on data from disparate sources – data lake, data warehouse, streaming, and more – creates challenges f
Robust Serverless Application Design with AWS Lambda Dead Letter Queues
Gene Ting, Solutions Architect AWS Lambda is a serverless, event-driven compute service that allows developers to bring their functions t
Addressing Data Residency with AWS
AWS has released a new whitepaper that has been requested by many AWS customers: AWS Policy Perspectives: Data Residency. Data residency
Extend AWS DeepLens to Send SMS Notifications with AWS Lambda
AWS DeepLens is a deep learning enabled developer toolkit with a video camera. It enables you to develop machine learning skills using hands-on co
time bushfire alerting with Complex Event Processing in Apache Flink on Amazon EMR and IoT sensor network | AWS Big Data Blog
Bushfires are frequent events in the warmer months of the year when the climate is hot and dry. Countries like Australia and the United States are
Predictive Data Science with Amazon SageMaker and a Data Lake on AWS
This Quick Start builds a data lake environment for building, training, and deploying machine learning (ML) models with Amazon SageMaker on the Am
Amazon Kinesis Data Firehose blog posts
Stream data into an Aurora PostgreSQL Database using AWS DMS and Amazon Kinesis Data Firehose In this blog post, we explore a solution to
Building a Data Processing Pipeline with Amazon Kinesis Data Streams and Kubeless
If you’re already running Kubernetes, FaaS (Functions as a Service) platforms on Kubernetes can help you leverage your existing investment in EC2
Amazon Kinesis Data Firehose Features
You can configure Amazon Kinesis Data Firehose to prepare your streaming data before it is loaded to data stores. Simply select an AWS Lambda fun
Troubleshoot Issues with SES Publishing Data to Kinesis Firehose
Here are some reasons why Amazon SES might not publish data to Amazon Kinesis Firehose: The delivery stream was deleted S
Amazon Kinesis Data Streams:AWS
Amazon Kinesis Data Streams (KDS) は、大規模にスケーラブルで持続的なリアルタイムのデータストリーミングサービスです。KDS はウエブサイトクリックストリームやデータべースイベントストリームや金融取引、ソーシャルメディアフィード、ITロゴ、ロケーション追跡イベ
Amazon Kinesis Data Firehose Pricing
If you send 5,000 records of streaming data per second, each record 7KB in size, to Amazon Kinesis Data Firehose in US-East to be loaded into Amaz
Amazon Kinesis Data Firehose Resources
Reducing the time to get actionable insights from data is important to all businesses and customers who employ batch data analytics tools are ex
Amazon Redshift Data Warehouse with Matillion ETL on AWS
AWS offers a common architectural structure that enables you to leverage new and existing big data technologies and data warehouse methods. Throug
Amazon Kinesis Data Firehose 定價
如果您每秒向美國東部的 Amazon Kinesis Data Firehose 傳送 5000 條流資料記錄(且每條記錄的大小為 7KB),並且隨後將這些資料記錄載入到 Amazon S3 中,同時啟用了將資料格式轉換為 Apache Parquet 的功能,則您的月度費用的計算方式如下:
論文閱讀筆記《The Contextual Loss for Image Transformationwith Non-Aligned Data》(ECCV2018 oral)
github 區域 偏移 org nbsp 修改 transfer style 但是 目錄: 相關鏈接 方法亮點 相關工作 方法細節 實驗結果 總結與收獲 相關鏈接 論文:https://arxiv.org/abs/1803.02077 代碼:https://
Managing a Spotify Library with Go and AWS Lambda
Managing a Spotify Library with Go and AWS LambdaSpotify exposes a robust API that can be used to manage your (or someone elses) music library and do all s