Перейти к основному содержимому
Перейти к основному содержимому

Коннектор Flink

ClickHouse Supported

Это официальный Apache Flink Sink Connector с поддержкой от ClickHouse. Он построен на основе AsyncSinkBase Flink и официального java client ClickHouse.

Коннектор поддерживает DataStream API Apache Flink. Поддержка Table API запланирована в одном из будущих релизов.

Требования

  • Java 11+ (для Flink 1.17+) или 17+ (для Flink 2.0+)
  • Apache Flink 1.17+

Коннектор разбит на два артефакта для поддержки Flink 1.17+ и Flink 2.0+. Выберите артефакт, соответствующий нужной версии Flink:

Версия FlinkАртефактВерсия ClickHouse Java ClientТребуемая версия Java
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+
Примечание

Коннектор не тестировался на версиях Flink ниже 1.17.2

Установка и настройка

Добавьте как зависимость

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

Скачайте бинарный файл

Шаблон имени JAR-файла:

flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar

где:

Все доступные JAR-файлы опубликованных релизов можно найти в репозитории Maven Central.

Использование DataStream API

Пример

Предположим, вы хотите вставить необработанные данные CSV в ClickHouse:

public static void main(String[] args) {
    // Настройте ClickHouseClient
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // Создайте ElementConverter
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // Создайте sink и задайте формат с помощью `setClickHouseFormat`
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // Наконец, подключите DataStream к sink.
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}

Дополнительные примеры и фрагменты кода можно найти в наших тестах:

Пример быстрого запуска

Мы подготовили пример на базе Maven для быстрого начала работы с ClickHouse Sink:

Более подробные инструкции см. в руководстве по примеру

Варианты подключения к DataStream API

Параметры клиента ClickHouse

ПараметрыОписаниеЗначение по умолчаниюОбязательно
urlПолный URL ClickHouseН/ДДа
usernameИмя пользователя базы данных ClickHouseН/ДДа
passwordПароль базы данных ClickHouseН/ДДа
databaseИмя базы данных ClickHouseН/ДДа
tableИмя таблицы ClickHouseН/ДДа
optionsmap параметров конфигурации Java-клиентаПустой mapНет
serverSettingsmap настроек сессии сервера ClickHouseПустой mapНет
enableJsonSupportAsStringНастройка сервера ClickHouse, при которой для типа данных JSON ожидается String в формате JSONtrueНет

options и serverSettings следует передавать клиенту как Map<String, String>. Если для любого из них передан пустой map, будут использоваться значения по умолчанию клиента или сервера соответственно.

Примечание

Все доступные параметры Java-клиента перечислены в ClientConfigProperties.java и на этой странице документации.

Все доступные настройки сессии сервера перечислены на этой странице документации.

Например:

Map<String, String> javaClientOptions = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

Map<String, String> serverSettings = Map.of(
    "insert_deduplicate", "1"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    javaClientOptions,
    serverSettings,
    false // enableJsonSupportAsString
);

Параметры sink

Следующие параметры напрямую взяты из AsyncSinkBase во Flink:

ParametersDescriptionDefault ValueRequired
maxBatchSizeМаксимальное количество записей, вставляемых за один пакетN/AДа
maxInFlightRequestsМаксимальное количество запросов в обработке, допустимое до того, как sink начнет применять обратное давлениеN/AДа
maxBufferedRequestsМаксимальное количество записей, которое может быть буферизовано в sink до применения обратного давленияN/AДа
maxBatchSizeInBytesМаксимальный размер пакета (в байтах). Все отправляемые пакеты будут меньше либо равны этому значениюN/AДа
maxTimeInBufferMSМаксимальное время, в течение которого запись может находиться в sink перед сбросомN/AДа
maxRecordSizeInBytesМаксимальный размер записи, который принимает sink; записи большего размера будут автоматически отклоненыN/AДа

Поддерживаемые типы данных

В таблице ниже приведена краткая справка по преобразованию типов данных при вставке данных из Flink в ClickHouse.

Тип JavaТип ClickHouseПоддерживаетсяМетод сериализации
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeН/Д
long/LongTime64Н/Д
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantН/Д

Примечания:

  • При выполнении операций с датами необходимо указать ZoneId.
  • При выполнении операций с десятичными числами необходимо указать точность и масштаб.
  • Чтобы ClickHouse мог разобрать строку Java как JSON, необходимо включить enableJsonSupportAsString в ClickHouseClientConfig.
  • Коннектору требуется ElementConvertor для преобразования элементов входного DataStream в данные для ClickHouse. Для этого коннектор предоставляет ClickHouseConvertor и POJOConvertor, которые можно использовать для реализации этого преобразования с помощью указанных выше методов сериализации DataWriter.

Поддерживаемые входные форматы

Список доступных входных форматов ClickHouse можно найти на этой странице документации и в ClickHouseFormat.java.

Чтобы указать формат, который коннектор должен использовать для сериализации вашего DataStream в данные для ClickHouse, используйте функцию setClickHouseFormat. Например:

ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
        convertorString,
        MAX_BATCH_SIZE,
        MAX_IN_FLIGHT_REQUESTS,
        MAX_BUFFERED_REQUESTS,
        MAX_BATCH_SIZE_IN_BYTES,
        MAX_TIME_IN_BUFFER_MS,
        MAX_RECORD_SIZE_IN_BYTES,
        clickHouseClientConfig
);
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
Примечание

По умолчанию коннектор использует RowBinaryWithDefaults или RowBinary, если параметр setSupportDefault в ClickHouseClientConfig явно установлен в true или false соответственно.

Метрики

Коннектор предоставляет следующие дополнительные метрики в дополнение к уже существующим метрикам Flink:

MetricDescriptionTypeStatus
numBytesSendОбщее количество байтов, отправленных в ClickHouse в полезной нагрузке запроса. Примечание: эта метрика измеряет размер сериализованных данных, переданных по сети, и может отличаться от written_bytes в system.query_log ClickHouse, который отражает фактическое количество байтов, записанных в хранилище после обработкиCounter
numRecordSendОбщее количество записей, отправленных в ClickHouseCounter
numRequestSubmittedОбщее количество отправленных запросов (фактическое количество выполненных сбросов)Counter
numOfDroppedBatchesОбщее количество батчей, отброшенных из-за ошибок, не допускающих повторной попыткиCounter
numOfDroppedRecordsОбщее количество записей, отброшенных из-за ошибок, не допускающих повторной попыткиCounter
totalBatchRetriesОбщее количество повторных попыток отправки батчей из-за ошибок, допускающих повторную попыткуCounter
writeLatencyHistogramГистограмма распределения задержки успешной записи (мс)Histogram
writeFailureLatencyHistogramГистограмма распределения задержки неуспешной записи (мс)Histogram
triggeredByMaxBatchSizeCounterОбщее количество сбросов, вызванных достижением maxBatchSizeCounter
triggeredByMaxBatchSizeInBytesCounterОбщее количество сбросов, вызванных достижением maxBatchSizeInBytesCounter
triggeredByMaxTimeInBufferMSCounterОбщее количество сбросов, вызванных достижением maxTimeInBufferMSCounter
actualRecordsPerBatchГистограмма распределения фактического размера батчаHistogram
actualBytesPerBatchГистограмма распределения фактического количества байтов в батчеHistogram

Ограничения

  • В настоящее время sink предоставляет гарантию доставки как минимум один раз. Работа над семантикой exactly-once отслеживается здесь.
  • Sink пока не поддерживает очередь необрабатываемых сообщений (DLQ) для буферизации записей, которые не удалось обработать. Пока коннектор будет пытаться повторно вставить записи, завершившиеся ошибкой, и отбрасывать их в случае неудачи. Эта возможность отслеживается здесь.
  • Sink пока не поддерживает создание через Table API Flink или Flink SQL. Эта возможность отслеживается здесь.

Совместимость версий ClickHouse и безопасность

  • Коннектор ежедневно тестируется в CI с рядом последних версий ClickHouse, включая latest и head. Список тестируемых версий периодически обновляется по мере выхода новых релизов ClickHouse. Список версий, с которыми коннектор ежедневно проходит тесты, см. здесь.
  • Сведения об известных уязвимостях и инструкции по сообщению о новой уязвимости см. в политике безопасности ClickHouse.
  • Мы рекомендуем регулярно обновлять коннектор, чтобы своевременно получать исправления безопасности и другие улучшения.
  • Если у вас возникла проблема с миграцией, создайте issue в GitHub, и мы ответим!
  • Для оптимальной производительности убедитесь, что тип элементов вашего DataStream не является Generic — см. описание различий между типами во Flink. Элементы не типа Generic позволяют избежать накладных расходов на сериализацию через Kryo и повысить пропускную способность при записи в ClickHouse.
  • Мы рекомендуем установить maxBatchSize как минимум в 1000, а в идеале — в диапазоне от 10 000 до 100 000. Подробнее см. в этом руководстве по пакетным вставкам.
  • Чтобы выполнять дедупликацию в стиле OLTP или upsert в ClickHouse, обратитесь к этой странице документации. Примечание: не путайте это с дедупликацией пакетов при повторных попытках, которая подробно описана ниже.

Устранение неполадок

CANNOT_READ_ALL_DATA

Может возникнуть следующая ошибка:

com.clickhouse.client.api.ServerException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 9205. Bytes expected: 1100022.: (at row 9) : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)

Причина: Чаще всего ошибка CANNOT_READ_ALL_DATA означает, что схема таблицы ClickHouse перестала соответствовать схеме записей Flink. Это может произойти, если одна из них была изменена с нарушением обратной совместимости.

Решение: Обновите схему таблицы ClickHouse, входной тип данных коннектора или и то и другое, чтобы они снова стали совместимыми. При необходимости см. сопоставление типов, чтобы понять, как типы Java соотносятся с типами ClickHouse. Примечание: если какие-то записи все еще находятся в обработке, при перезапуске коннектора потребуется сбросить состояние Flink.

Низкая пропускная способность

Вы можете заметить, что пропускная способность коннектора не масштабируется вместе с параллелизмом задания (числом задач Flink) при записи в ClickHouse.

Причина: фоновый процесс слияния частей в ClickHouse может замедлять вставки. Это может происходить, если настроенный размер пакета слишком мал, коннектор слишком часто выполняет сброс, или из-за сочетания обоих факторов.

Решение: Мониторьте метрики numRequestSubmitted и actualRecordsPerBatch, чтобы понять, как подобрать размер пакета (maxBatchSize) и частоту сброса. Рекомендации по размеру пакета также приведены в разделе Расширенное и рекомендуемое использование.

В таблице ClickHouse отсутствуют строки

Причина: Пакеты были отброшены либо из-за ошибки, не подлежащей повторной попытке, либо потому, что их не удалось вставить за заданное число повторных попыток (настраивается через ClickHouseClientConfig.setNumberOfRetries()). Примечание: по умолчанию коннектор пытается повторно вставить пакет до 3 раз, прежде чем отбросить его.

Решение: Проверьте логи TaskManager и/или трассировки стека, чтобы определить первопричину.

Участие в разработке и поддержка

Если вы хотите внести вклад в проект или сообщить о каких-либо проблемах, мы будем рады вашей помощи! Перейдите в наш репозиторий GitHub, чтобы создать issue, предложить улучшения или отправить pull request.

Мы приветствуем ваш вклад! Перед началом работы, пожалуйста, ознакомьтесь с руководством для участников в репозитории. Спасибо, что помогаете улучшать коннектор ClickHouse для Flink!