1. 程式人生 > >基於canal資料加工系統

基於canal資料加工系統

資料加工系統開發文件

1.資料加工系統主流程

這裡寫圖片描述

2.服務介紹

整個框架由4個服務組成:canal-server、canal-client、bimq-consumer、crm-bi-task四個服務組成

canal-server:主要負責獲取mysql伺服器的binlog日誌,按照使用者提交的事務維度解析為結構化資料,然後儲存於記憶體中,並提供tcp服務。

canal-client:通過connect到canal-server從記憶體中pull資料,以mq.schedule.table.queue的方式建立訊息佇列,傳送具體的訊息到佇列,訊息按照每條資料庫記錄變更前、變更後、操作型別(insert、update、delete)形式組織。

bimq-consumer:主要負責從具體的資料庫記錄變更到資料加工系統資料的提取過程,其一般通過監控一個表或者幾個表的資料,通過業務實現具體的業務邏輯(如多個表格的join,或者一些magic number到列舉型別的轉化過程)獲取到最終可以使用的資料加工系統資料。其中根據具體業務場景的不同提供了三種方式將資料加工系統儲存到elasticsearch中,從而提供給具體業務分析和使用該資料。
方式1(實時):該方式一般是用於處理近實時和增量的使用者資料的新增、更新等操作。
方式2(歷史):使用者也可以通過自己實現定時任務,從資料庫中提取最終的資料加工系統資料,然後更新到elasticsearch中。
方式3(第三方):除了支援基於該框架來實現資料加工系統資料的更新外,還可以通過呼叫提供的dubbo服務來更新資料,從而解決一些資料不是儲存到mysql的情形。
另外該服務還提供了通過sql來查詢elasticsearch中資料加工系統資料的操作,例如支援基本查詢語句(select)、聚合操作(group)、度量操作(sum、count、avg)。

crm-bi-task:主要負責將使用者提供的將資料加工系統資料寫到elasticsearch中的功能。
綜合來說:業務方只需要在bimq-consumer中完成從mysql變更資料到資料加工系統資料的業務邏輯開發和歷史資料提取的工作。

3.開發流程

3.1 申請資料庫表的日誌變更監控(業務方完成)

業務方如果希望實現通過監控資料表的方式來實時更新資料加工系統資料,需要將要監控的資料庫和表名給運維人員或者相關負責人。
如:highso_db1.callthinklog

3.2 配置資料表的監控(運維人員完成)

服務名:canal(即canal-server)
canal是支援不重啟服務,新增資料庫表監控的。相關運維人員只需要按照格式,新增具體的配置項到服務配置目錄下就行了,canal會每隔5秒掃描指定目錄下的配置,如果有配置的變更,會啟動或者重啟相關的instance,從而達到不需要重新啟動jvm而新增新的監控的目的,同時在新增監控時,也不會影響現有服務的使用。
新增配置如下:

1)在canal.property中修改屬性:canal.destinations 如:新增callthinlog canal.destinations=looyudata 到canal.destinations=looyudata,callthinklog
2)copy已經有的資料夾重名為callthinklog,然後更改canal.instance.defaultDatabaseName和canal.instance.filter.regex(如果資料庫的連線資訊有變更需要,更新相關連線資訊)。

當更新完配置後,最遲5秒後該表的canal-server就已經啟動成功

3.3 啟動client獲取server中的資料資訊(運維人員完成)

服務名:bimq-connector-canal(即canal-client)
啟動client:更改client【bimq-connector-canal】在disconf上的配置即可自動啟動client。如:canal.destinations=xxx.table1 為 canal.destinations=xxx.table1,xxx.table2。
程式會根據改屬性的變更,自動的啟動或者關閉與server連線的client服務。
另外提供了CanalOperateController來實現單個client的手動啟動、關閉和獲取所有正在執行的client列表

3.4 業務開發資料加工系統邏輯(業務方)

服務名:bimq-connector-consumer

3.4.1 資料加工系統mq-consumer開發【實時】

對於增量或者實時的資料加工系統資料的新增或者更新,只需要實現一個mq的consumer來監聽由canal-client產生的mq訊息即可。
1. 業務方需要定義一個數據加工系統的DTO,並繼承於BiSerializable,並放在同一個目錄下。【因為程式碼會自動掃描該目錄下的類並建立相關的資料加工系統需要的mq佇列等資訊,如CustomerDTO 對應queue=mq.T1DTO.queue routing=mq.T1DTO.routing

/**
 * 使用者TDO
 */
@Data
public class T1DTO extends BiSerializable {

    /**
     * 使用者ID
     */
    private Long id;

    /**
     * 註冊時間
     */
    private Date registerdate;

    /**
     * 註冊位置
     */
    private String registerplace;

}
  1. 實現一個mq消費者並整合,用於消費canal-client產生的機會,具體程式碼可以參見T1Consumer。

    定義要消費的queue的名字常量:mq.資料庫名.資料表名.queue

public class QueueConsumerConstants {
    public static final String T1_QUEUE = "canal.xxxx.t1.queue";
    public static final String T2_QUEUE = "canal.xxxx.t2.queue";

    public static final String T3_QUEUE = "canal.xxxx.t3.queue";
    public static final String T4_QUEUE = "canal.xxxx.t4.queue";


    public static final String T5_QUEUE = "canal.xxxx.t5.queue";
    public static final String T6_QUEUE = "canal.xxxx.t6.queue";
    public static final String T7_QUEUE = "canal.xxxx.t7.queue";
    public static final String T8_QUEUE = "canal.xxxx.t8.queue";
}
繼承AbstractConsumer,實現具體的業務方程式碼邏輯
1.handleUpdate方法:beforeMap資料記錄變更前的key-value  afterMap資料記錄變更後的key-value. 當提取出需要資料加工系統DTO物件後,通過呼叫ConsumerMessageBuilder.buildUpdateMessageVo產生需要的返回值
2.handleInsert方法:處理資料庫的插入操作
3.handleDelete方法:處理資料庫的刪除操作

@Component
@RabbitListener(queues = QueueConsumerConstants.T1_QUEUE,
        containerFactory = "rabbitListenerContainerFactory")
@Slf4j
public class T1Consumer extends AbstractConsumer {

    @Override
    protected List<ConsumerMessageBuilder.MessageVo> handleUpdate(String tableName,
            Map<String, Object> beforeMap, Map<String, Object> afterMap) throws Exception {
        return null;
    }

    @Override
    protected List<ConsumerMessageBuilder.MessageVo> handleInsert(String tableName,
            Map<String, Object> afterMap) throws Exception {


        Customer after = BeanCovertUtil.convertToJavaBeanIgnoreCase(afterMap, Customer.class);
        CustomerDTO dto = BeanCovertUtil.copy(after, CustomerDTO.class);

        dto.setNewKey(dto.getId().toString());

        log.info("新增使用者,id:{}",after.getId());

        return Lists.newArrayList(ConsumerMessageBuilder.buildInsertMessageVo(dto));
    }

    @Override
    protected List<ConsumerMessageBuilder.MessageVo> handleDelete(String tableName,
            Map<String, Object> beforeMap) throws Exception {
        return null;
    }

}

PS: 每個DTO物件都需要設定newKey或者oldKey,es會將其作為主鍵更新或者刪除其資料
其中的註解中的queues為使用者需要監聽的資料表佇列

3.4.2 資料加工系統定時任務開發【歷史資料】

定時任務主要解決歷史資料加工系統資料的提取到elasticsearch的過程
其中對於大家常用的單表的資料表遍歷提供了一個封裝,業務方只需要繼承AbstractScanJdbcJob,並設定泛型引數domain物件和DTO物件即可。參考SynT1Data2ESJobV2

該方式的單表資料遍歷主要是通過資料庫表中的id自增主鍵進行切分,每次取1000條,轉化為domain物件,然後通過呼叫covertToDTO轉化為DTO物件,然後通過mq傳送資料加工系統資訊。

public class T1Data2ESJobV2 extends AbstractScanJdbcJob<Customer, CustomerDTO> {
    @Resource(name = "haixueJdbcTemplate")
    private JdbcTemplate jdbcTemplate;

    @Override
    protected JdbcTemplate getJdbcTemplate() {
        return jdbcTemplate;
    }
}

/**
 * bimq-connector Created by caowenyi on 2017/11/10 .
 */
@Slf4j
public abstract class AbstractScanJdbcJob<T, DTO extends BiSerializable> implements SimpleJob {
    @Resource
    private MessageSender messageSender;

    protected Class<T> tClass;
    protected Class<DTO> dtoClass;
    private String idField;

    protected String tableName;
    private RateLimiter rateLimiter = RateLimiter.create(50);

    protected abstract JdbcTemplate getJdbcTemplate();

    protected DTO covertToDTO(T obj) {
        return BeanCovertUtil.copy(obj, dtoClass);
    }


    private Long getId(T obj) {
        try {
            Field idField = tClass.getDeclaredField(this.idField);
            idField.setAccessible(true);
            Object id = idField.get(obj);
            return (Long) id;
        } catch (NoSuchFieldException | IllegalAccessException e) {
            log.error(e.getMessage(), e);
        }
        return null;
    }

    protected long getStartId() {
        return 0L;
    }

    protected int getPageSize() {
        return 1000;
    }

    public AbstractScanJdbcJob(){
        this("id");
    }

    public AbstractScanJdbcJob(String idField) {
        this.tClass = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        this.dtoClass = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1];
        this.tableName = tClass.getSimpleName().toLowerCase();
        this.idField=idField;
    }

    public AbstractScanJdbcJob(String idField,String tableName) {
        this.tClass = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        this.dtoClass = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1];
        this.tableName = tableName;
        this.idField=idField;

    }

    @Override
    public void execute(ShardingContext shardingContext) {
        long startId = getStartId();
        int pageSize = getPageSize();
        JdbcTemplate jdbcTemplate = getJdbcTemplate();
        while (true) {
            String sql = String.format("select * from %s where %s>%d order by %s limit %d ", tableName, idField, startId, idField, pageSize);
            List<T> objectList = jdbcTemplate.query(sql, (resultSet, row) -> {
                Map<String, Object> hm = new HashMap<>();
                ResultSetMetaData rsmd = resultSet.getMetaData();
                int count = rsmd.getColumnCount();
                for (int i = 1; i <= count; i++) {
                    String key = rsmd.getColumnLabel(i);
                    Object value = resultSet.getObject(i);
                    hm.put(key, value);
                }
                try {
                    return BeanCovertUtil.convertToJavaBeanIgnoreCase(hm, tClass);
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
                return null;
            });
            //List<T> objectList = jdbcTemplate.queryForList(sql, tClass);
            if (CollectionUtils.isEmpty(objectList)) {
                break;
            }
            for (T obj : objectList) {
                DTO dtoObj = covertToDTO(obj);
                Long id = getId(obj);
                if (id == null) {
                    throw new IllegalArgumentException("id欄位不存在請重寫getNewKey方法");
                }
                dtoObj.setNewKey(String.valueOf(id));
                ConsumerMessageBuilder.MessageVo messageVo = ConsumerMessageBuilder.buildInsertMessageVo(dtoObj);
                try {
                    messageSender.sendMsg(messageVo);
                    rateLimiter.acquire(1);
                } catch (IllegalAccessException | InvocationTargetException | IntrospectionException e) {
                    log.error(e.getMessage(), e);
                }
                startId = id;
            }
        }
    }
}

其中提供瞭如下幾個protected方法用於相容一些使用者的細微不同:
1. 使用者需要提供具體使用的jdbc物件,因為不同的物件對應不同的jdbc物件名。具體參看:XXXX1DataSourceConfig、XXXX2DataSourceConfig、XXXX3DataSourceConfig

protected abstract JdbcTemplate getJdbcTemplate();
  1. 支援使用者轉入的欄位進行分片,在構造引數中傳入改欄位名即可
public AbstractScanJdbcJob(String idField) {
        this.tClass = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        this.dtoClass = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1];
        this.tableName = tClass.getSimpleName().toLowerCase();
        this.idField=idField;

    }
  1. 支援使用者設定tableName,預設為domain的名字
  2. 支援使用者設定開始資料遍歷的起始位置和每次從資料庫中獲取的數量
protected long getStartId() {
        return 0L;
}

protected int getPageSize() {
        return 1000;
}

PS:業務方也可以不用繼承AbstractScanJdbcJob類,只需要MessageSender messageSender傳送資料加工系統DTO資料即可(該方法有必傳欄位的限制)。

3.4.3 其他方式的資料加工系統錄入

為支援一些非資料庫方式的資料加工系統資料需要放到elasticsearch的需求,提供了RPC方式錄入資料加工系統資料。

  1. 引入依賴包
<dependency>
    <groupId>com.haixue</groupId>
        <artifactId>bimq-connector-remote</artifactId>
        <version>1.0.2-SNAPSHOT</version>
</dependency>
  1. 匯入配置檔案,並設定zk.hosts和dubbo.port兩個值
    springboot形式
@Configuration
@ImportResource(locations = {"classpath*:applicationPersonal-client.xml"})
public class DubboClientConfig {

}

spring形式
在已經的xml bean配置檔案中新增

<import resource="applicationPersonal-client.xml" />

程式碼:

@Slf4j
public class RemoteSqlTest extends BaseTest{
    @Resource
    private MessageService biMessageService;

    @Test
    public void testMessage() throws Exception {
        CustomerDTO customerDTO=new CustomerDTO();
        customerDTO.setId(7749763L);
        customerDTO.setRegisterdate(DateUtils.parse("2017-11-08 15:52:15",PATTERN_YYYY_MM_DD_HH_MM_SS));
        customerDTO.setRegisterplace("4");
        customerDTO.setNewKey("7749763");
        ApiResponse apiResponse=biMessageService.sendInsertMessage(customerDTO);
        log.info(apiResponse.toString());
    }
}

3.4.4 資料加工系統資料的sql查詢服務

支援了基於sql的elasticsearch查詢的dubbo服務,其jar包引入和spring配置參考 “3.4.3 其他方式的資料加工系統錄入”

程式碼:

@Slf4j
public class RemoteSqlTest extends BaseTest{
    @Resource
    private SqlSearchPersonalService sqlSearchPersonalService;
    @Test
    public void testSql() throws Exception {
        ApiResponse apiResponse=sqlSearchPersonalService.searchBySql("select * from crmcommunicationdto_index");
        log.info(apiResponse.toString());
    }
}

3.5 crm-bi-task錄入資料加工系統資料【bi開發人員】

crm-bi-task服務通過實現資料加工系統佇列的消費者來將業務方提供的資料加工系統資料錄入到elasticsearch中。

1.建立es中的索引物件
customerdto_index

{
  "settings": {
    "index": {
      "number_of_replicas": "1",
      "number_of_shards": "6"
    }
  },
  "_default_": {
    "_all": {
      "enabled": false
    }
  },
  "mappings": {
    "customerdto_dim": {
      "date_detection": true,
      "dynamic_date_formats": [
        "yyyy-MM-dd HH:mm:ss"
      ],
      "dynamic": "true",
      "dynamic_templates": [
        {
          "strings": {
            "match_mapping_type": "string",
            "mapping": {
              "type": "string",
              "index": "not_analyzed"
            }
          }
        },
        {
          "texts": {
            "match_mapping_type": "text",
            "mapping": {
              "type": "string",
              "index": "not_analyzed"
            }
          }
        },
        {
            "dates": {
            "match_mapping_type": "date",
            "mapping": {
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd"
            }
          }
        }
      ],
      "properties": {
        "newKey": {
          "type": "string",
          "index": "not_analyzed"
        },
        "oldKey": {
          "type": "string",
          "index": "not_analyzed"
        },
        "id": {
          "type": "long"
        },
        "registerdate": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd"
        },
        "registerplace": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}
  1. 實現需要監聽訊息佇列名常量
public class QueueConstants {
    public static final String T1_QUEUE = "mq.T1DTO.queue";
    public static final String T2_QUEUE = "mq.T2DTO.queue";
    public static final String T3_QUEUE = "mq.T3DTO.queue";
    public static final String T4_QUEUE = "mq.T4DTO.queue";
    public static final String T5_QUEUE = "mq.T5DTO.queue";
    public static final String T6_QUEUE = "mq.T6DTO.queue";
    public static final String T7_QUEUE = "mq.T7DTO.queue";
    public static final String T8_QUEUE = "mq.T8DTO.queue";

}
  1. 實現消費者
    只需要繼承AbstractConsumer,然後傳入需要錄入的DTO物件即可。
@Component
@RabbitListener(queues = QueueConstants.CUSTOMER_QUEUE, containerFactory = "rabbitListenerContainerFactory")
public class T1Consumer extends AbstractConsumer<T1DTO> {
}

4.其他支援

該系統除了支援資料加工系統的需求外,對於另外一些需要通過把資料庫表同步到其他資料儲存中間的清洗,例如solar等。業務方只需要申請要監控表格,然後監聽具體的訊息佇列,然後處理資料庫記錄變更,並同步到其他中介軟體儲存系統中。特別適合一些快取的設定。

效能測試

測試環境:
1)canal-server/canal-client/bimq-connector-consumer都被部署在MacBook Pro單機環境中(處理器=2.2 GHz Intel Core i7 記憶體=16 GB 1600 MHz DDR3 影象卡=Intel Iris Pro 1536 MB 磁碟=500GB 作業系統=macOS Sierra JVM記憶體使用情況:-Xmx1g -Xms1g -Xmn512m
2)mysql和rabbitmq都是單機部署在同一臺虛擬機器:虛擬機器配置(處理器=Intel® Core™ i7-4770HQ CPU @ 2.20GHz、記憶體=992.3 MiB 影象卡=Gallium 0.4 on llvmpipe (LLVM 3.8, 256 bits) 磁碟=51.7 GB 作業系統型別=64位
3) 單機測試由於效能限制(最大上限600qps)

具體效能如下:
這裡寫圖片描述
4)總結延遲和資料操作的型別、當個操作影響的記錄條數沒有直接關係,延遲維持在350ms,整個服務的效能只和資料庫、canal服務的部署環境有關,並且能夠支援600qps以上的操作。