Gobblin Kafka Source原始碼分析
阿新 • • 發佈:2019-02-04
private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets offsets, Optional topicSpecificState) { Extract extract = this.createExtract(DEFAULT_TABLE_TYPE, DEFAULT_NAMESPACE_NAME, partition.getTopicName()); WorkUnit workUnit = WorkUnit.create(extract); if (topicSpecificState.isPresent()) { workUnit.addAll(topicSpecificState.get()); } workUnit.setProp(TOPIC_NAME, partition.getTopicName()); workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName()); workUnit.setProp(PARTITION_ID, partition.getId()); workUnit.setProp(LEADER_ID, partition.getLeader().getId()); workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString()); workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset()); workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset()); return workUnit; }