1. 程式人生 > >Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS

Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS

Customers with strict compliance or data security requirements often require data to be encrypted at all times, including at rest or in transit within the AWS cloud. This post shows you how to build a real-time streaming application using Kinesis in which your records are encrypted while at rest or in transit.

Amazon Kinesis overview

The Amazon Kinesis platform enables you to build custom applications that analyze or process streaming data for specialized needs. Amazon Kinesis can continuously capture and store terabytes of data per hour from hundreds of thousands of sources such as website clickstreams, financial transactions, social media feeds, IT logs, and transaction tracking events.

Through the use of HTTPS, Amazon Kinesis Streams encrypts data in-flight between clients which protects against someone eavesdropping on records being transferred. However, the records encrypted by HTTPS are decrypted once the data enters the service. This data is stored at rest for 24 hours (configurable up to 168 hours) to ensure that your applications have enough headroom to process, replay, or catch up if they fall behind.

Walkthrough

In this post you build encryption and decryption into sample Kinesis producer and consumer applications using the Amazon Kinesis Producer Library (KPL), the Amazon Kinesis Consumer Library (KCL), AWS KMS, and the aws-encryption-sdk. The methods and the techniques used in this post to encrypt and decrypt Kinesis records can be easily replicated into your architecture. Some constraints:

  • AWS charges for the use of KMS API requests for encryption and decryption, for more information see AWS KMS Pricing.
  • You cannot use Amazon Kinesis Analytics to query Amazon Kinesis Streams with records encrypted by clients in this sample application.
  • If your application requires low latency processing, note that there will be a slight hit in latency.

The following diagram shows the architecture of the solution.

Encrypting the records at the producer

Before you call the PutRecord or PutRecords API, you will encrypt the string record by calling KinesisEncryptionUtils.toEncryptedString.

In this example, we used a sample stock sales ticker object:

example {"tickerSymbol": "AMZN", "salesPrice": "900", "orderId": "300", "timestamp": "2017-01-30 02:41:38"}. 

The method (KinesisEncryptionUtils.toEncryptedString) call takes four parameters:

  • amazonaws.encryptionsdk.AwsCrypto
  • stock sales ticker object
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • util.Map of an encryption context

A ciphertext is returned back to the main caller which is then also checked for size by calling KinesisEncryptionUtils.calculateSizeOfObject. Encryption increases the size of an object. To prevent the object from being throttled, the size of the payload (one or more records) is validated to ensure it is not greater than 1MB. In this example encrypted records sizes with payload exceeding 1MB are logged as warning. If the size is less than the limit, then either addUserRecord or PutRecord and PutRecords are called if you are using the KPL or the Kinesis Streams API respectively

Example: Encrypting records with KPL

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context);
log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000)
   log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encrypted record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
//Adding the encrypted record to stream
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, randomPartitionKey(), data);
Futures.addCallback(f, callback);

In the above code, the example sales ticker record is passed to the KinesisEncryptionUtils.toEncryptedString and an encrypted record is returned. The encryptedRecord value is also passed to KinesisEncryptionUtils.calculateSizeOfObject and the size of the encrypted payload is returned and checked to see if it is less than 1MB. If it is, the payload is then UTF-8 encoded (KinesisEncryptionUtils.toEncryptedByteStream), then sent to the stream for processing.

Example: Encrypting the records with Streams PutRecord

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context);
log.info("Size of encrypted object is : " + KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000)
    log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encryptyed record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
putRecordRequest.setData(data);
putRecordRequest.setPartitionKey(randomPartitionKey());
//putting the record into the stream
kinesis.putRecord(putRecordRequest);

Verifying that records are encrypted

After the call to KinesisEncryptionUtils.toEncryptedString, you can print out the encrypted string record just before UTF-8 encoding. An example of what is printed to standard output when running this sample application is shown below.

[main] INFO kinesisencryption.streams.EncryptedProducerWithStreams - String Record is TickerSalesObject{tickerSymbol='FB', salesPrice='184.285409142', orderId='2a0358f1-9f8a-4bbe-86b3-c2929047e15d', timeStamp='2017-01-30 02:41:38'} and Encrypted Record String is AYADeMf6zmVg9JvIkGNv5M39rhUAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFpUkpCaG1UOFQ3UTZQZ253dm9FSU9iUDZPdE1xTHdBZ1JjNlZxN2doMDZ3QlBEWUZndWdJSEFKaXNvT0ZPUGsrdz09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDCCYBk+hfB3tOGVx7QIBEIA7FqaEcOWpic+gKNeT+dUe4yttB9dsZSFPAUTlz2L2zlyLXSLMh1otRH24SO485ov+TCTtRCgiA8a9rYQCAAAAAAwAABAArlGWPO8BavNSJIpJOtJekRUhOwbM+WM1NBVXB/////8AAAABXNZnRND3J7u8EZx3AAAAkfSxVPMYUv0Ovrd4AIUTmMcaiR0Z+IcJNAXqAhvMmDKpsJaQG76Q6pYExarolwT+6i87UOi6TGvAiPnH74GbkEniWe66rAF6mOra2JkffK6pBdhh95mEOGLaVPBqs2jswUTfdcBJQl9NEb7wx9XpFX8fNDF56Vly7u6f8OQ7lY6fNrOupe5QBFnLvwehhtogd72NTQ/yEbDDoPKUZN3IlWIEAGYwZAIwISFw+zdghALtarsHSIgPMs7By7/Yuda2r3hqSmqlCyCXy7HMFIQxHcEILjiLp76NAjB1D8r8TC1Zdzsfiypi5X8FvnK/6EpUyFoOOp3y4nEuLo8M2V/dsW5nh4u2/m1oMbw=

You can also verify that the record stayed encrypted in Streams by printing out the UTF-8 decoded received record immediately after the getRecords API call. An example of the print output when running the sample application is shown below.

[Thread-2] INFO kinesisencryption.utils.KinesisEncryptionUtils - Verifying object received from stream is encrypted. -Encrypted UTF-8 decoded : AYADeBJz/kt7Fm3L1lvS8Wy8jhAAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFrM2N4K2s1ODJuOGVlNWF3TVJ1dk1UUHZQc2FHeGoxQisxb09kNWtDUExHYjJDS0lMZW5LSnlYakRmdFR4dzQyUT09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDAGI3oWLlIJ2p6kffQIBEIA7JVUOTsLtEyNK8vS4GIS9iyTejuB2xhIpRXfG8o0lUfHawcrCbNbNH8XLm/8RW5JbgXo10EpOs8dSjkICAAAAAAwAABAAy64r24sGVKWN4C1gXCwJYHvZkLpJJj16SZlhpv////8AAAABg2pPFchIiaM7D9VuAAAAkwh10ul5sZQ08KsgkFszOOvFoQu95CiY7cK8H+tBloVOZglMqhhhvoIIZLr9hmI8/lQvRXzGDdo7Xkp0FAT5Jpztt8Hq/ZuLfZtNYIWOw594jShqpZt6uXMdMnpb/38R3e5zLK5vrYkM6NS4WPMFrHsOKN5tn0CDForgojRcdpmCJ8+cWLNltb2S+EJiWiyWS+ibw2vJ/RFm6WZO6nD+MXn3vyMAZzBlAjAuIUTYL1cbQ3ENxDIeXHJAWQguNPqxq4HgaCmCEI9/rn/GAKSc2nT9ln3UsVq/2dgCMQC7yNJ3DCTnppavfxTbcVS+rXaDDpZZx/ZsluMqXAFM5/FFvKRqr0dVML28tGunxmU=

Decrypting the records at the consumer

After you receive the records into your consumer as a list, you can get the data as a ByteBuffer by calling record.getData. You then decode and decrypt the byteBuffer by calling the KinesisEncryptionUtils.decryptByteStream. This method takes five parameters:

  • amazonaws.encryptionsdk.AwsCrypto
  • record ByteBuffer
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • key arn string
  • java.util.Map of your encryption context

A string representation of the ticker sales object is returned back to the caller for further processing. In this example, this representation is just printed to standard output.

[Thread-2] INFO kinesisencryption.streams.DecryptShardConsumerThread - Decrypted Text Result is TickerSalesObject{tickerSymbol='AMZN', salesPrice='304.958313333', orderId='50defaf0-1c37-4e84-85d7-bc15597355eb', timeStamp='2017-01-30 02:41:38'}

Example: Decrypting records with the KCL and Streams API

ByteBuffer buffer = record.getData();
//Decrypting the encrypted record data
String decryptedResult = KinesisEncryptionUtils.decryptByteStream(crypto,buffer,prov,this.getKeyArn(), this.getContext());
log.info("Decrypted Text Result is " + decryptedResult);

With the above code, records in the Kinesis Streams are decrypted using the same key ARN and encryption context that was previously used to encrypt it at the producer side.

Maven dependencies

To use the implementation I’ve outlined in this post, you need to use a few maven dependencies outlined below in the pom.xml together with the Bouncy Castle libraries. Bouncy Castle provides a cryptography API for Java.

 <dependency>
        <groupId>org.bouncycastle</groupId>
        <artifactId>bcprov-ext-jdk15on</artifactId>
        <version>1.54</version>
    </dependency>
<dependency>
   <groupId>com.amazonaws</groupId>
   <artifactId>aws-encryption-sdk-java</artifactId>
   <version>0.0.1</version>
</dependency>

Summary

You may incorporate above sample code snippets or use it as a guide in your application code to just start encrypting and decrypting your records to and from an Amazon Kinesis Stream.

A complete producer and consumer example application and a more detailed step-by-step example of developing an Amazon Kinesis producer and consumer application on AWS with encrypted records is available at the kinesisencryption github repository.

If you have questions or suggestions, please comment below.

About the Author

Temitayo Olajide is a Cloud Support Engineer with Amazon Web Services. He works with customers to provide architectural solutions, support and guidance to implementing high velocity streaming data applications in the cloud. In his spare time, he plays ping-pong and hangs out with family and friends

Related

相關推薦

Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS

Customers with strict compliance or data security requirements often require data to be encrypted at all times, including at rest or in transit wi

Encrypt and Decrypt by AES algorithm in both python and android

我想在 python 把敏感的資料先加密,傳給Android 後在Android裡解密,在stackoverflow 上面找到這一個範例,滿神奇的,程式碼貼進 python 和 android 就可以跑了,而且可以互相加/解密沒問題。 我的執行畫面: 上面 terminal 是 python 的執行結

Analyze and visualize your VPC network traffic using Amazon Kinesis and Amazon Athena

Network log analysis is a common practice in many organizations.  By capturing and analyzing network logs, you can learn how devices on your netwo

Facebook Login Using AWS Amplify and Amazon Cognito

Set up auth with Facebook Login in our iOS appWe are now going to cloud-enable our mobile app by adding Facebook metdata to our info.plist, AWS Mobile SDK

In Using AWS Amplify and Amazon Cognito

Integrate the awsconfiguration.json file into your iOS projectWhen using the AWS Amplify CLI to provision backend resources, it produces a file called awsc

Build More Reliable and Secure Windows Services Using Amazon Kinesis Agent for Microsoft Windows

We’ve all been there. You’ve deployed a new service on Windows servers. Maybe it’s based on Microsoft technology such as IIS, AD, DHCP, Microsoft

Configuring your server to provide HTTPS using Let's Encrypt and Nginx

OK, let's start with some definitions and then we start with the magic steps:Let's encrypt: is a certificate authority (CA) that provides free digital cert

Fetching records using fetchone() and fetchmany()

(Sponsors) Get started learning Python with DataCamp's free Intro to Python tutorial. Learn Data Science by completing interactive coding challenges and

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes

In part 1 of this series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Amazon Kinesis

Authenticate Using AWS Directory Service with Amazon QuickSight

Amazon Web Services is Hiring. Amazon Web Services (AWS) is a dynamic, growing business unit within Amazon.com. We are currently hiring So

Building a Data Processing Pipeline with Amazon Kinesis Data Streams and Kubeless

If you’re already running Kubernetes, FaaS (Functions as a Service) platforms on Kubernetes can help you leverage your existing investment in EC2

Zillow Provides Near-Real-Time Home-Value Estimates Using Amazon Kinesis

Amazon Web Services is Hiring. Amazon Web Services (AWS) is a dynamic, growing business unit within Amazon.com. We are currently hiring So

Amazon Kinesis Firehose Data Transformation with AWS Lambda

Shiva Narayanaswamy, Solution Architect Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to des

Backup and Recovery Approaches Using AWS

Amazon Web Services is Hiring. Amazon Web Services (AWS) is a dynamic, growing business unit within Amazon.com. We are currently hiring So

Amazon Kinesis Data Streams:AWS

Amazon Kinesis Data Streams (KDS) は、大規模にスケーラブルで持続的なリアルタイムのデータストリーミングサービスです。KDS はウエブサイトクリックストリームやデータべースイベントストリームや金融取引、ソーシャルメディアフィード、ITロゴ、ロケーション追跡イベ

Create a Simple Resource Record Set in Amazon Route 53 Using the AWS CLI

{ "Comment": "CREATE/DELETE/UPDATE", "Changes": [ { "Action": "CREATE",

Have You Tried Delphi on Amazon Linux? (就是AWS用的Linux)

enables custom customers servers nbsp ble exists compile targe The new Delphi Linux compiler enables customers to take new or existing Wi

Serverless Backend using AWS Lambda: Hands

Storing Data in DynamoDBBefore we can start storing data in our DynamoDB, we need to set some permissions for the Lambda function to have write access.Insi

Chris Kocher to Present #DigitalTransformation and Disruption, Amazon Style @ExpoDX @Xopher5 #AI #IoT #IIoT #SmartCities

Amazon is pursuing new markets and disrupting industries at an incredible pace. Almost every industry seems to be in its crosshairs. Companies and industri