1. 程式人生 > 實用技巧 >使用DataStax Java驅動程式的最佳實踐

使用DataStax Java驅動程式的最佳實踐

引言

如果您想開始建立自己的基於Cassandra的Java程式,歡迎!


也許您已經參加過我們精彩的DataStax Academy課程或開發者大會,又或者仔細閱讀過Cassandra Java驅動的文件。相比示例程式,現在是時候步入真實世界、處理實際案例了。


那麼問題來了:Java驅動提供了各種設定選項,在開始使用時我們需要知道哪些一般準則,以及從什麼最佳實踐入手,從而輕鬆構建一個在生產環境中有復原力的、實時的、高效能應用呢?


這是個好問題!基於大量和您類似的Datastax客戶中所累積的經驗,本文件將列出一些基本的準則。


在DataStax Java 驅動中有很多的旋鈕和拉桿,並且每一個都有其存在的理由。但是絕大部分的使用者不需要深入瞭解那些更深奧高階的功能,所以這篇文件將會重點介紹大部分應用程式會遇到的一般情況。在這個過程中,我們會再詳細介紹提到的高階選項,但是他們確實不是一般情況。 現在就讓我們先關注一下主要情況吧。


在本文開始前,我們預設您已經對這些內容有所瞭解:客戶端與服務端架構的應用程式、Cassandra基礎,還有Datastax Java驅動的主要元素,例如Sessions(會話)、Statements(語句)、Results sets(結果集)、Rows(行)。附錄中提供了一些幫助您熟悉這些概念的資源。

最佳實踐

這一部分介紹了使用DataStax Java驅動建立一個Java程式的最佳實踐方法。部分最佳實踐的方法也可以運用到其它的開發語言中(及其關聯的DataStax驅動),但是在本文中我們將會專注於Java驅動。我們將最佳實踐的方法分為4組:總體指導、建立連線、查詢語句和查詢結果。

總體指導

  • 當今最佳的實踐方法是使用最新的DataStax Java驅動,即版本4.x。雖然3.x版本的驅動仍然可用,且在必要情況下依然是個不錯的選擇,本文會把重點放在4.x的驅動。

  • 使用您正在使用的Datastax Java驅動主要版本(比如4.x)中最新的細分版本(比如截止2020年8月,4.x中的4.9.0或3.x中的3.10.0)。

  • 多資料中心部署的情況下,我們建議一個應用例項僅與單獨一個數據中心掛鉤使用。換句話說,如果您要將您的應用部署到多個地區,您應該部署多個應用,每個地區分別對應一個應用。每一個應用都應將驅動程式連線到每一個當地的資料中心(如下圖所示),並且應使用Global Load Balancer(全域性負載平衡器)在多個應用例項中引導流量。

    • 如果Cassandra在一個地區的資料中心無法訪問(如離線以進行定期維護),Global Load Balancer會將流量導到其它資料區域中的應用程式例項。

  • 還有一個最佳的實踐方法是使用與資料庫查詢有關的指標來對應用程式進行監測。這個會非常有助於測試應用程式的查詢效能。更多關於驅動程式監測的指標請參考這裡。在預設情況下所有的指標收集功能都是處於禁用狀態。如果您需要它們,您需要在建立連線的時候啟用。詳見下面資訊。

    • 如果一個監測指標處於啟用狀態,您可以通過Session#getMetrics()方法獲得指標內容。

建立與資料庫的連線

用於建立與Cassandra的連線的CqlSession物件,可以通過很多不同的方式進行配置,包括通過配置檔案或者程式設計方式建立連線——我們建議使用配置檔案。當我們與Cassandra建立連線時,以下是一些最重要的考慮因素:

  • 請使用單獨的CqlSession:一個應用程式只使用一個單獨的CqlSession物件來連線到資料庫。在一些更復雜的情況下,有可能會有多個Class,每一個Class 都需要連線到一個CqlSession。在這種情況下,我們希望它們能使用同一個CqlSession,因此最好將其建立為單例模式。一個比較常見的方法是使用依賴注入框架,例如Spring。

  • 在application.conf裡設定選項:將所有非預設的選項在jar包裡的application.conf檔案中進行定義。這個設定檔案根據型別安全配置框架(Typesafe Config framework)設定引數。reference.conf檔案包括了所有的預設值,同時由於application.conf是基於它衍生出來的,因此您只需在applicaiton.conf裡顯式地指出任何想要複寫的值。

  • 遵循您安全性的最佳做法,同時使用適當的身份驗證和SSL選項。更多關於身份驗證SSL加密內容請參照DataStax Java驅動的文件。

  • 在建立連線的時使用多個接觸點(Contact Points)。這樣的話,您的應用即使在單個(或多個節點)離線時仍可與資料庫建立連線。特意選擇(或避免)種子節點並沒有任何益處。

datastax-java-driver.basic.contact-points = ["127.0.0.1:9042","localhost:9042"]
    • 只提供一個數據中心的接觸點,這個資料中心會被設為本地的資料中心(如下所示)。

    • 沒必要將一個數據中心所有的節點都設定為接觸點。當驅動程式建立初始連線後,它會發現叢集中所有剩下的節點,並通過負載平衡策略與這些節點建立直接的連線。

    • 當建立CqlSession時,您可以通過程式設計的方式設定連線點,這會複寫application.conf中的引數。

CqlSession.builder().addContactPoint(new InetSocketAddress("127.0.0.1", 9042));
  • 使用預設的負載平衡策略:當建立連線時,使用預設引數DefaultLoadBalancingPolicy。此負載平衡策略會更有效且相對平衡地呼叫節點進行查詢。

  • 當建立連線時,顯式地指定使用本地的資料中心。
datastax-java-driver.basic.load-balancing-policy.local-datacenter = dc1
    • 當建立CqlSession時,您也可以通過程式設計的方式設定本地資料中心,這會複寫application.conf中的引數。

CqlSession.builder().withLocalDatacenter("dc1")
  • 顯式地將一致性級別設定為LOCAL_QUORUM並將預設序列一致性設為LOCAL_SERIAL。不然,預設的一致性是LOCAL_ONE,預設序列一致性級別為SERIAL,這些通常都不建議使用。

datastax-java-driver.basic.request.consistency = LOCAL_QUORUM
datastax-java-driver.basic.request.serial-consistency = LOCAL_SERIAL

  • 不要將預設查詢冪等性設定為true。其預設值為false,請保留它。設定為true很危險,因為一些操作是通過冪等查詢自動完成的,但事實上不是所有操作都是冪等的。因此,請針對每次查詢顯式明確地設定冪等查詢操作。

  • 請在advanced.metrics中啟用合適的指標,從而啟用指標收集功能。因為沒有一個“啟用所有指標”的選項,您必須明確指出您需要啟動的每一個指標。

    • advanced.metrics.session指定會話級(session-level)指標。下面列出了一些您可能感興趣的advanced.metrics.session.enabled中的指標:

advanced.metrics.session.enabled = [bytes-sent, bytes-received, connected-nodes, cql-requests, cql-client-timeouts, cql-prepared-cache-size, throttling.delay, throttling.queue-size, throttling.errors]

    • advanced.metrics.node 指定了節點級(node-level)指標,下面列出了一些您可能感興趣advanced.metrics.node.enabled:中的指標:

advanced.metrics.node.enabled = [pool.open-connections, pool.available-streams, pool.in-flight, pool.orphaned-streams, bytes-sent, bytes-received, cql-messages, errors.request.unsent, errors.request.aborted, errors.request.write-timeouts, errors.request.read-timeouts, errors.request.unavailables, errors.request.others, retries.total, retries.aborted, retries.read-timeout, retries.write-timeout, retries.unavailable, retries.other, ignores.total, ignores.aborted, ignores.read-timeout, ignores.write-timeout, ignores.unavailable, ignores.other, errors.connection.init, errors.connection.auth]

執行查詢

Cassandra的查詢會先建立一個Statement物件,再通過CqlSession物件執行。以下有很多種Statement種類:

  • 簡易語句(SimpleStatement): 由CQL字串或查詢生成器(Query Builder)建立。

  • 引數化查詢語句(PreparedStatement): 可以在構建一次後被重複使用多次的語句,對於有不同引數的常見查詢而言,此種語句具有實用、高效能且更安全的優勢。

  • 繫結語句(BoundStatement): 用於對PrepareStatement查詢的單次呼叫,並允許使用者繫結只適用於此次呼叫的引數。

  • 批處理語句(BatchStatement): 封裝多個簡易語句或者繫結語句,並批量執行。

當執行查詢語句時,這裡有一些需要注意的重要事項:

  • 請注意在DataStax Java驅動程式4.x的版本中,所有的Statements都是不可變的(immutable)。因此如果想要重新設定Statement的選項的話,您必須為這個值重新分配一個引用。

// 不正確 - bound2 的值並沒有改變。
bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
// 正確 - bound2 的值發生了改變。
bound2 = bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);

  • 避免以程式設計方式發出DDL語句,除非可以確保它們不會被應用程式中的多個例項併發執行。
    • 當叢集中的節點對模式(Schema)產生分歧時,可能會因Cassandra分散式的特性出現問題。

    • 使用DDL語句時,應確保模式協議(schema agreement)達成一致。您可以通過呼叫Session#getExecutionInfo()方法獲取ExecutionInfo, 之後呼叫ExecutionInfo#isSchemaInAgreement()方法。如果返回True,那您就可以確保所有節點達成一致。

    • 您也可以在任何時候通過呼叫Session#checkSchemaAgreement()的方法判斷所有節點的模式是否一致。就好比,如果上述的方法返回了False,則可以進入一個迴圈直到Session#checkSchemaAgreement()返回為真(在發生異常之前會有一些超時響應或者計數器)。

  • 不要將CQL作為字串執行(如通過CqlSession#execute(String)),而是為CQL字串建立一個Simple Statement以便設定Statement的選項,如一致性級別或者冪等性。

SimpleStatement stmt1 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (1,2)");
stmt1 = stmt1.setIdempotent(true);

  • Java驅動程式的 QueryBuilder(查詢生成器)是另一個很棒的通過程式設計建立CQL查詢語句的方式。 這也比與用程式設計方式構建CQL字串想法的動態CQL更可取。

SimpleStatement read = QueryBuilder.selectFrom(ks, tbl)
                .columns("pkey", "x").build();

  • 通過CqlSession#prepare()準備所有可以重複使用的語句。

SimpleStatement stmt3 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (:pkey,:x)");
PreparedStatement prepared2 = session.prepare(stmt3);
BoundStatement bound2 = prepared2.bind();
bound2 = bound2.setInt("pkey", 100);
bound2 = bound2.setInt("x", 200);

    • 您也可以通過按位置而不是名字來繫結變數,但是當被執行的查詢語句發生變化,使用位置繫結變數會更容易出錯,因為這就意味著您需要更注意繫結變數的位置。

SimpleStatement stmt2 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (?,?)");
PreparedStatement prepared =  session.prepare(stmt2);
BoundStatement bound1 = prepared.bind();
bound1 = bound1.setInt(0, 10);
bound1 = bound1.setInt(1, 20);

    • 當值繫結到準備好的語句時,請使用“unset”(未設定)選項來避免任何插入NULL值的可能性。或者,您可以僅將不為空的值與變數進行繫結,因為未繫結的繫結標記(bind marker)同樣會導致該列被設定為“unset”。

BoundStatement bound2 = prepared2.bind();
bound2 = bound2.setInt("pkey", 100);
bound2 = bound2.setInt("x", 200);
bound2 = bound2.unset(1); // 為了展示需要
  • 如果一個CQL查詢確實是冪等的,請通過Statement#setIdempotent()來設定冪等的選項。這是為了能夠自動重複執行陳述語句(如:自動重試或者推測執行)。作為安全預防措施,驅動程式不會自動重新處理任何冪等設定為false(否)的語句。

bound2 = bound2.setIdempotent(true);

  • 謹慎使用Batch Statement(批處理語句),因為Cassandra叢集的協調節點(coordinator)負責在內部處理這些查詢,發出多個查詢會增加協調節點的負荷。

    • 批處理語句的數量應該保持相對較少。 如小於20條語句。

    • 應只在用例需要時才使用logged batches(記入日誌的批處理語句), 因為它們會佔用更多資源。

    • 更多細節請查詢這裡

BatchStatementBuilder batchBuilder = BatchStatement.builder(BatchType.UNLOGGED);
BoundStatement bound3 = prepared.bind(1000,2000);
BoundStatement bound4 = prepared.bind(10000,20000);
batchBuilder.addStatement(bound3);
batchBuilder.addStatement(bound4);
BatchStatement batch = batchBuilder.build();
  • 我們應該避免Light weight transactions(輕量級事務),因為它們的效能相較於一般操作會慢很多。一般情況下,會有其它可以避免Light weight transactions的方式來建構應用程式。總之,輕量級事務的存在是有原因的,但請合理使用。

    • 輕量級事務和一般事務的查詢語句一樣。

    • 請確保將預設的SERIAL的序列一致性級別複寫為LOCAL_SERIAL。

處理查詢和結果

根據查詢被執行的方式,查詢結果會通過以下幾種方式被返回:

  • Synchronous execution(同步執行):返回一個可以被同步處理的ResultSet

ResultSet resultSet = session.execute(read);

  • Asynchronous execution(非同步執行):返回一個可以通過CompletionStage API 非同步處理查詢的CompletionStage<AsyncResultSet>。

CompletionStage<AsyncResultSet> asyncResult = session.executeAsync(bound1);

  • Reactive execution(響應式執行):返回一個ReactiveResultSet,它可以通過Reactive Labraries (響應庫)以響應式的方法處理結果。

ReactiveResultSet reactiveResult = session.executeReactive(bound2);

最常見的處理方式排列序可能為Synchronous > Reactive > Asynchronous.

以下是一些處理查詢結果的最佳實踐方法:

  • 養成在CQL查詢中指定所有需要被返回的列的習慣。比如但凡可以請避免使用 "SELECT * FROM …"這樣的查詢語句。如果表的結構將來發生變化的話,這樣做會有幫助。

  • 像其它資料庫一樣,請注意像“SELECT * FROM ks.tbl”這樣的簡單語句會返回資料庫中的所有資料。Cassandra是為儲存海量資料而設計的,這種請求很可能會返回一個含有巨量資料的結果。儘管有時候確實需要進行全表掃描,但也請儘量避免這種情況。

    • 如果確實需要全表掃描,利用好的分散式計算方法,或者像Spark這樣的分散式計算框架是一個不錯的選擇。DataStax為此開發了Spark Cassandra Connector (Cassandra Spark聯結器),實現了資料掃描的最佳實踐。

  • 當處理多個語句時,利用非同步執行(asynchronous execution)同時處理多條查詢。

    • 當載入資料時,這個方式可以大量縮短處理時間。

    • 當從多張表格獲取資料時,這可以減少總體延遲。

    • 確保所有非同步操作已被處理併成功執行。

    • 更多詳情請參考這裡

  • 對於INSERT、UPDATE或DELETE的操作,請確保捕獲異常以確保操作成功。

    • 如果一個查詢因為查詢超時失敗,請嘗試重新嘗試查詢,因為Cassandra分散式的特性應該可以成功。如果一個查詢超時,它會丟擲一個異常(如 QueryExecutionException),您可以使用try-catch結構捕獲這個異常。

  • 一般情況下,使用驅動程式預設的分頁方法,這種方法在一頁讀取完畢時將自動獲取下一頁。

    • 響應式處理也可以自動處理分頁功能。

    • 通過CompletionStage API使用自動分頁。而使用AsynchronousResultSet這種手動處理API則需要手動分頁。

  • 避免呼叫 ResultSet#all(),因為這會將所有結果匯入一個在記憶體中的列表(List),這樣會影響效能或產生記憶體錯誤

    • 與之相對,可以利用the ResultSet#iterator()方法迭代取出返回結果的每一行。

for (Row row : resultSet) {
    System.out.println("pkey: " + row.getInt("pkey"));
}

總結

本文包含了許多適應大多數用例且最常見的最佳實踐。但是在某些情況下,使用其它不同的設定或方法也許確實是得當的,所以這篇文章應該只作為一個起點。我們建議可以從這些設定和實踐中入手,開始建立一個可用的應用程式。如果您發現應用程式沒有如預期一樣執行,可以考慮參考DataStax Java驅動程式文章(或者其它資源)來微調您的設定。


有了這些最佳實踐作為您的起點,您正走在通往順利搭建以Cassandra作為後臺的應用程式的光明大道上。請享受這個過程吧!

附件 1 - 有用的連結

附件 2 - 程式碼案例

以下程式碼也可以在以下Github repository連結。 https://github.com/DataStax-Examples/ex_bestpractices

附件 2.1 - application.conf

datastax-java-driver {
 # Contact Points
 basic.contact-points = ["127.0.0.1:9042","localhost:9042"]

 # Local Data Center
 basic.load-balancing-policy.local-datacenter = dc1

 # Default Consistency Level
 basic.request.consistency = LOCAL_QUORUM
 basic.request.serial-consistency = LOCAL_SERIAL

 # Metrics
 advanced.metrics {
   # The session-level metrics
   session {
     enabled = [
       bytes-sent, bytes-received,
       connected-nodes,
       cql-requests, cql-client-timeouts, cql-prepared-cache-size,
       throttling.delay, throttling.queue-size, throttling.errors,
     ]
   }
   # The node-level metrics.
   node {
     enabled = [
       pool.open-connections, pool.available-streams, pool.in-flight, pool.orphaned-streams,
       bytes-sent, bytes-received,
       cql-messages,
       errors.request.unsent, errors.request.aborted, errors.request.write-timeouts, errors.request.read-timeouts, errors.request.unavailables, errors.request.others,
       retries.total, retries.aborted, retries.read-timeout, retries.write-timeout, retries.unavailable, retries.other,
       ignores.total, ignores.aborted, ignores.read-timeout, ignores.write-timeout, ignores.unavailable, ignores.other,
       errors.connection.init, errors.connection.auth
     ]
   }
 }
}

附件 2.2 - SampleApplication.java

package com.datastax.example.bestpractices;

import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException;
import com.datastax.oss.driver.api.core.servererrors.QueryValidationException;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import reactor.core.publisher.Flux;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletionStage;

public class SampleApplication {
   public static void main(String[] args) {
       final String dc = "dc1";
       final String ks = "ks";
       final String tbl = "tbl";
       final int ddlRetries = 3;
       final int ddlRetrySleepMs = 500;
       // Create CqlSession
       CqlSession session = CqlSession.builder()
//                .withAuthCredentials("cass_user", "choose_a_better_password")  // redundant with application.conf, but showing for demonstration
//                .addContactPoint(new InetSocketAddress("127.0.0.1", 9042))     // redundant with application.conf, but showing for demonstration
               .withLocalDatacenter("dc1")
               .build();

       // Set up driver metrics and expose via JMX (as an example)
       MetricRegistry registry = session.getMetrics()
               .orElseThrow(() -> new IllegalStateException("Metrics are disabled"))
               .getRegistry();

       JmxReporter reporter =
               JmxReporter.forRegistry(registry)
                       .inDomain("com.datastax.oss.driver")
                       .build();
       reporter.start();

       // Create Keyspace
       SimpleStatement createKeyspace = SimpleStatement.newInstance("CREATE KEYSPACE IF NOT EXISTS "+ks
               +" WITH replication = {'class': 'NetworkTopologyStrategy', '" + dc + "': '1'}");
       // Synchronous
       ResultSet ddl = null;
       try {
           ddl = session.execute(createKeyspace);
       }
       catch (Exception e) {
           throw new RuntimeException("Error creating keyspace");
       }
       if (!ddl.getExecutionInfo().isSchemaInAgreement()) {
           int retries = 0;
           while ((retries < ddlRetries) && session.checkSchemaAgreement()) {
               try {
                   Thread.sleep(ddlRetrySleepMs);
               }
               catch (InterruptedException ie) {
                   throw new RuntimeException("Interrupted while waiting for schema agreement");
               }
               retries++;
           }
       }

       // Create Table (using helper method)
       SimpleStatement createTable = SimpleStatement.newInstance("CREATE TABLE IF NOT EXISTS "+ks+"."+tbl
               +"(pkey INT, x INT, PRIMARY KEY ((pkey)))");
       if (!executeDdl(session, createTable, ddlRetries, ddlRetrySleepMs)) {
           System.err.println("Error creating table");
           System.exit(1);
       }

       // Insert some data
       // Simple
       SimpleStatement stmt1 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (1,2)");
       stmt1 = stmt1.setIdempotent(true);
       try {
           session.execute(stmt1);
       }
       catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
           // Handle query failure
           throw new RuntimeException("Error inserting first data");
       }

       // Prepared
       // Using positional bind markers
       SimpleStatement stmt2 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (?,?)");
       PreparedStatement prepared =  session.prepare(stmt2);
       BoundStatement bound1 = prepared.bind();
       bound1 = bound1.setInt(0, 10);
       bound1 = bound1.setInt(1, 20);
       // Async
       try {
           CompletionStage<AsyncResultSet> asyncResult = session.executeAsync(bound1);
           asyncResult.toCompletableFuture().get();
       }
       catch (Exception e) {
           // process exception
       }

       // Reactive
       // Using named bind markers instead of positional
       SimpleStatement stmt3 = SimpleStatement.newInstance("INSERT INTO "+ks+"."+tbl+"(pkey,x) VALUES (:pkey,:x)");
       PreparedStatement prepared2 = session.prepare(stmt3);
       BoundStatement bound2 = prepared2.bind();
       bound2 = bound2.setInt("pkey", 100);
       bound2 = bound2.setInt("x", 200);
       bound2 = bound2.unset(1); // Showing for demonstration
       bound2 = bound2.setIdempotent(true); // set idempotency
       bound2 = bound2.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
       ReactiveResultSet reactiveResult = session.executeReactive(bound2);
       Flux.from(reactiveResult).blockLast();

       // Batch
       BatchStatementBuilder batchBuilder = BatchStatement.builder(BatchType.UNLOGGED);
       BoundStatement bound3 = prepared.bind(1000,2000);
       BoundStatement bound4 = prepared2.bind().setInt("pkey",10000).setInt("x", 20000);
       batchBuilder.addStatement(bound3);
       batchBuilder.addStatement(bound4);
       BatchStatement batch = batchBuilder.build();
       try {
           session.execute(batch);
       }
       catch (QueryExecutionException qee) {
           // Handle query timeout - let's retry it
           try {
               session.execute(batch);
           }
           catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
               // Handle query failure
               throw new RuntimeException("Error second try inserting data");
           }

       }
       catch (QueryValidationException | AllNodesFailedException ex) {
           // Handle query failure
           throw new RuntimeException("Error inserting data");
       }


       // Query Builder
       SimpleStatement read = QueryBuilder.selectFrom(ks, tbl)
               .columns("pkey", "x").build();
       // Using a helper method to execute DML
       ResultSet resultSet = executeDml(session, read, "Error reading data");
       if (null != resultSet) {
           for (Row row : resultSet) {
               System.out.println("pkey: " + row.getInt("pkey") + ", x: " + row.getInt("x"));
           }
       }

       // Cleanup
       session.close();
   }

   private static boolean executeDdl(CqlSession session, Statement ddlStatement, int ddlRetries, int ddlRetrySleepMs) {
       ResultSet ddl = null;
       try {
           ddl = session.execute(ddlStatement);
       }
       catch (Exception e) {
           throw new RuntimeException("Exception while executing DDL (" + ddlStatement + ")");
       }
       if (!ddl.getExecutionInfo().isSchemaInAgreement()) {
           int retries = 0;
           while ((retries < ddlRetries) && session.checkSchemaAgreement()) {
               try {
                   Thread.sleep(ddlRetrySleepMs);
               }
               catch (InterruptedException ie) {
                   throw new RuntimeException("Interrupted while waiting for schema agreement");
               }
               retries++;
           }
       }
       return true;
   }

   private static ResultSet executeDml(CqlSession session, Statement query, String errorString) {
       ResultSet resultSet = null;
       try {
           resultSet = session.execute(query);
       }
       catch (QueryExecutionException | QueryValidationException | AllNodesFailedException ex) {
           // Handle query failure
           throw new RuntimeException(errorString);
       }
       return resultSet;
   }
}