Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS
Customers with strict compliance or data security requirements often require data to be encrypted at all times, including at rest or in transit within the AWS cloud. This post shows you how to build a real-time streaming application using Kinesis in which your records are encrypted while at rest or in transit.
Amazon Kinesis overview
The Amazon Kinesis platform enables you to build custom applications that analyze or process streaming data for specialized needs. Amazon Kinesis can continuously capture and store terabytes of data per hour from hundreds of thousands of sources such as website clickstreams, financial transactions, social media feeds, IT logs, and transaction tracking events.
Through the use of HTTPS, Amazon Kinesis Streams encrypts data in-flight between clients which protects against someone eavesdropping on records being transferred. However, the records encrypted by HTTPS are decrypted once the data enters the service. This data is stored at rest for 24 hours (configurable up to 168 hours) to ensure that your applications have enough headroom to process, replay, or catch up if they fall behind.
Walkthrough
In this post you build encryption and decryption into sample Kinesis producer and consumer applications using the Amazon Kinesis Producer Library (KPL), the Amazon Kinesis Consumer Library (KCL), AWS KMS, and the aws-encryption-sdk. The methods and the techniques used in this post to encrypt and decrypt Kinesis records can be easily replicated into your architecture. Some constraints:
- AWS charges for the use of KMS API requests for encryption and decryption, for more information see AWS KMS Pricing.
- You cannot use Amazon Kinesis Analytics to query Amazon Kinesis Streams with records encrypted by clients in this sample application.
- If your application requires low latency processing, note that there will be a slight hit in latency.
The following diagram shows the architecture of the solution.
Encrypting the records at the producer
Before you call the PutRecord or PutRecords API, you will encrypt the string record by calling KinesisEncryptionUtils.toEncryptedString.
In this example, we used a sample stock sales ticker object:
example {"tickerSymbol": "AMZN", "salesPrice": "900", "orderId": "300", "timestamp": "2017-01-30 02:41:38"}.
The method (KinesisEncryptionUtils.toEncryptedString) call takes four parameters:
- amazonaws.encryptionsdk.AwsCrypto
- stock sales ticker object
- amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
- util.Map of an encryption context
A ciphertext is returned back to the main caller which is then also checked for size by calling KinesisEncryptionUtils.calculateSizeOfObject. Encryption increases the size of an object. To prevent the object from being throttled, the size of the payload (one or more records) is validated to ensure it is not greater than 1MB. In this example encrypted records sizes with payload exceeding 1MB are logged as warning. If the size is less than the limit, then either addUserRecord or PutRecord and PutRecords are called if you are using the KPL or the Kinesis Streams API respectively
Example: Encrypting records with KPL
//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context);
log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000)
log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encrypted record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
//Adding the encrypted record to stream
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, randomPartitionKey(), data);
Futures.addCallback(f, callback);
In the above code, the example sales ticker record is passed to the KinesisEncryptionUtils.toEncryptedString and an encrypted record is returned. The encryptedRecord value is also passed to KinesisEncryptionUtils.calculateSizeOfObject and the size of the encrypted payload is returned and checked to see if it is less than 1MB. If it is, the payload is then UTF-8 encoded (KinesisEncryptionUtils.toEncryptedByteStream), then sent to the stream for processing.
Example: Encrypting the records with Streams PutRecord
//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context);
log.info("Size of encrypted object is : " + KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000)
log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encryptyed record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
putRecordRequest.setData(data);
putRecordRequest.setPartitionKey(randomPartitionKey());
//putting the record into the stream
kinesis.putRecord(putRecordRequest);
Verifying that records are encrypted
After the call to KinesisEncryptionUtils.toEncryptedString, you can print out the encrypted string record just before UTF-8 encoding. An example of what is printed to standard output when running this sample application is shown below.
[main] INFO kinesisencryption.streams.EncryptedProducerWithStreams - String Record is TickerSalesObject{tickerSymbol='FB', salesPrice='184.285409142', orderId='2a0358f1-9f8a-4bbe-86b3-c2929047e15d', timeStamp='2017-01-30 02:41:38'} and Encrypted Record String is AYADeMf6zmVg9JvIkGNv5M39rhUAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFpUkpCaG1UOFQ3UTZQZ253dm9FSU9iUDZPdE1xTHdBZ1JjNlZxN2doMDZ3QlBEWUZndWdJSEFKaXNvT0ZPUGsrdz09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDCCYBk+hfB3tOGVx7QIBEIA7FqaEcOWpic+gKNeT+dUe4yttB9dsZSFPAUTlz2L2zlyLXSLMh1otRH24SO485ov+TCTtRCgiA8a9rYQCAAAAAAwAABAArlGWPO8BavNSJIpJOtJekRUhOwbM+WM1NBVXB/////8AAAABXNZnRND3J7u8EZx3AAAAkfSxVPMYUv0Ovrd4AIUTmMcaiR0Z+IcJNAXqAhvMmDKpsJaQG76Q6pYExarolwT+6i87UOi6TGvAiPnH74GbkEniWe66rAF6mOra2JkffK6pBdhh95mEOGLaVPBqs2jswUTfdcBJQl9NEb7wx9XpFX8fNDF56Vly7u6f8OQ7lY6fNrOupe5QBFnLvwehhtogd72NTQ/yEbDDoPKUZN3IlWIEAGYwZAIwISFw+zdghALtarsHSIgPMs7By7/Yuda2r3hqSmqlCyCXy7HMFIQxHcEILjiLp76NAjB1D8r8TC1Zdzsfiypi5X8FvnK/6EpUyFoOOp3y4nEuLo8M2V/dsW5nh4u2/m1oMbw=
You can also verify that the record stayed encrypted in Streams by printing out the UTF-8 decoded received record immediately after the getRecords API call. An example of the print output when running the sample application is shown below.
[Thread-2] INFO kinesisencryption.utils.KinesisEncryptionUtils - Verifying object received from stream is encrypted. -Encrypted UTF-8 decoded : AYADeBJz/kt7Fm3L1lvS8Wy8jhAAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFrM2N4K2s1ODJuOGVlNWF3TVJ1dk1UUHZQc2FHeGoxQisxb09kNWtDUExHYjJDS0lMZW5LSnlYakRmdFR4dzQyUT09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDAGI3oWLlIJ2p6kffQIBEIA7JVUOTsLtEyNK8vS4GIS9iyTejuB2xhIpRXfG8o0lUfHawcrCbNbNH8XLm/8RW5JbgXo10EpOs8dSjkICAAAAAAwAABAAy64r24sGVKWN4C1gXCwJYHvZkLpJJj16SZlhpv////8AAAABg2pPFchIiaM7D9VuAAAAkwh10ul5sZQ08KsgkFszOOvFoQu95CiY7cK8H+tBloVOZglMqhhhvoIIZLr9hmI8/lQvRXzGDdo7Xkp0FAT5Jpztt8Hq/ZuLfZtNYIWOw594jShqpZt6uXMdMnpb/38R3e5zLK5vrYkM6NS4WPMFrHsOKN5tn0CDForgojRcdpmCJ8+cWLNltb2S+EJiWiyWS+ibw2vJ/RFm6WZO6nD+MXn3vyMAZzBlAjAuIUTYL1cbQ3ENxDIeXHJAWQguNPqxq4HgaCmCEI9/rn/GAKSc2nT9ln3UsVq/2dgCMQC7yNJ3DCTnppavfxTbcVS+rXaDDpZZx/ZsluMqXAFM5/FFvKRqr0dVML28tGunxmU=
Decrypting the records at the consumer
After you receive the records into your consumer as a list, you can get the data as a ByteBuffer by calling record.getData. You then decode and decrypt the byteBuffer by calling the KinesisEncryptionUtils.decryptByteStream. This method takes five parameters:
- amazonaws.encryptionsdk.AwsCrypto
- record ByteBuffer
- amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
- key arn string
- java.util.Map of your encryption context
A string representation of the ticker sales object is returned back to the caller for further processing. In this example, this representation is just printed to standard output.
[Thread-2] INFO kinesisencryption.streams.DecryptShardConsumerThread - Decrypted Text Result is TickerSalesObject{tickerSymbol='AMZN', salesPrice='304.958313333', orderId='50defaf0-1c37-4e84-85d7-bc15597355eb', timeStamp='2017-01-30 02:41:38'}
Example: Decrypting records with the KCL and Streams API
ByteBuffer buffer = record.getData();
//Decrypting the encrypted record data
String decryptedResult = KinesisEncryptionUtils.decryptByteStream(crypto,buffer,prov,this.getKeyArn(), this.getContext());
log.info("Decrypted Text Result is " + decryptedResult);
With the above code, records in the Kinesis Streams are decrypted using the same key ARN and encryption context that was previously used to encrypt it at the producer side.
Maven dependencies
To use the implementation I’ve outlined in this post, you need to use a few maven dependencies outlined below in the pom.xml together with the Bouncy Castle libraries. Bouncy Castle provides a cryptography API for Java.
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-ext-jdk15on</artifactId>
<version>1.54</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-encryption-sdk-java</artifactId>
<version>0.0.1</version>
</dependency>
Summary
You may incorporate above sample code snippets or use it as a guide in your application code to just start encrypting and decrypting your records to and from an Amazon Kinesis Stream.
A complete producer and consumer example application and a more detailed step-by-step example of developing an Amazon Kinesis producer and consumer application on AWS with encrypted records is available at the kinesisencryption github repository.
If you have questions or suggestions, please comment below.
About the Author
Temitayo Olajide is a Cloud Support Engineer with Amazon Web Services. He works with customers to provide architectural solutions, support and guidance to implementing high velocity streaming data applications in the cloud. In his spare time, he plays ping-pong and hangs out with family and friends
Related
相關推薦
Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS
Customers with strict compliance or data security requirements often require data to be encrypted at all times, including at rest or in transit wi
Encrypt and Decrypt by AES algorithm in both python and android
我想在 python 把敏感的資料先加密,傳給Android 後在Android裡解密,在stackoverflow 上面找到這一個範例,滿神奇的,程式碼貼進 python 和 android 就可以跑了,而且可以互相加/解密沒問題。 我的執行畫面: 上面 terminal 是 python 的執行結
Analyze and visualize your VPC network traffic using Amazon Kinesis and Amazon Athena
Network log analysis is a common practice in many organizations. By capturing and analyzing network logs, you can learn how devices on your netwo
Facebook Login Using AWS Amplify and Amazon Cognito
Set up auth with Facebook Login in our iOS appWe are now going to cloud-enable our mobile app by adding Facebook metdata to our info.plist, AWS Mobile SDK
In Using AWS Amplify and Amazon Cognito
Integrate the awsconfiguration.json file into your iOS projectWhen using the AWS Amplify CLI to provision backend resources, it produces a file called awsc
Build More Reliable and Secure Windows Services Using Amazon Kinesis Agent for Microsoft Windows
We’ve all been there. You’ve deployed a new service on Windows servers. Maybe it’s based on Microsoft technology such as IIS, AD, DHCP, Microsoft
Configuring your server to provide HTTPS using Let's Encrypt and Nginx
OK, let's start with some definitions and then we start with the magic steps:Let's encrypt: is a certificate authority (CA) that provides free digital cert
Fetching records using fetchone() and fetchmany()
(Sponsors) Get started learning Python with DataCamp's free Intro to Python tutorial. Learn Data Science by completing interactive coding challenges and
How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes
In part 1 of this series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Amazon Kinesis
Authenticate Using AWS Directory Service with Amazon QuickSight
Amazon Web Services is Hiring. Amazon Web Services (AWS) is a dynamic, growing business unit within Amazon.com. We are currently hiring So
Building a Data Processing Pipeline with Amazon Kinesis Data Streams and Kubeless
If you’re already running Kubernetes, FaaS (Functions as a Service) platforms on Kubernetes can help you leverage your existing investment in EC2
Zillow Provides Near-Real-Time Home-Value Estimates Using Amazon Kinesis
Amazon Web Services is Hiring. Amazon Web Services (AWS) is a dynamic, growing business unit within Amazon.com. We are currently hiring So
Amazon Kinesis Firehose Data Transformation with AWS Lambda
Shiva Narayanaswamy, Solution Architect Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to des
Backup and Recovery Approaches Using AWS
Amazon Web Services is Hiring. Amazon Web Services (AWS) is a dynamic, growing business unit within Amazon.com. We are currently hiring So
Amazon Kinesis Data Streams:AWS
Amazon Kinesis Data Streams (KDS) は、大規模にスケーラブルで持続的なリアルタイムのデータストリーミングサービスです。KDS はウエブサイトクリックストリームやデータべースイベントストリームや金融取引、ソーシャルメディアフィード、ITロゴ、ロケーション追跡イベ
Create a Simple Resource Record Set in Amazon Route 53 Using the AWS CLI
{ "Comment": "CREATE/DELETE/UPDATE", "Changes": [ { "Action": "CREATE",
Have You Tried Delphi on Amazon Linux? (就是AWS用的Linux)
enables custom customers servers nbsp ble exists compile targe The new Delphi Linux compiler enables customers to take new or existing Wi
Serverless Backend using AWS Lambda: Hands
Storing Data in DynamoDBBefore we can start storing data in our DynamoDB, we need to set some permissions for the Lambda function to have write access.Insi
Chris Kocher to Present #DigitalTransformation and Disruption, Amazon Style @ExpoDX @Xopher5 #AI #IoT #IIoT #SmartCities
Amazon is pursuing new markets and disrupting industries at an incredible pace. Almost every industry seems to be in its crosshairs. Companies and industri