1. 程式人生 > >Gobblin Kafka Source原始碼分析

Gobblin Kafka Source原始碼分析

  private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets offsets,
      Optional topicSpecificState) {
    Extract extract = this.createExtract(DEFAULT_TABLE_TYPE, DEFAULT_NAMESPACE_NAME, partition.getTopicName());
    WorkUnit workUnit = WorkUnit.create(extract);
    if (topicSpecificState.isPresent()) {
      workUnit.addAll(topicSpecificState.get());
    }
    workUnit.setProp(TOPIC_NAME, partition.getTopicName());
    workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName());
    workUnit.setProp(PARTITION_ID, partition.getId());
    workUnit.setProp(LEADER_ID, partition.getLeader().getId());
    workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString());
    workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset());
    workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset());
    return workUnit;
  }