使用Kafka连接从PostgreSQL到Azure Data Explorer的变更数据捕获


这篇博文演示了如何使用变更数据捕获从PostgreSQL到 Azure Data Explorer (Kusto)使用 Apache Kafka 流数据库修改。

变更数据捕获(CDC)可用于跟踪数据库表中响应创建、更新和删除操作的行级变化。这是一个强大的技术,但只有在有办法利用这些事件并将其提供给其他服务时才有用。

介绍

使用Apache Kafka,可以将传统的成批的ETL过程转换为实时、流式的模式。你可以自己动手(DIY),使用你选择的客户端SDK编写好的老Kafka生产者/消费者。但是,当你有了Kafka Connect和它的一套即用型连接器之后,你为什么还要这样做呢?

一旦你选择了Kafka Connect,你有几个选择。一个是JDBC连接器,它基本上是通过调查目标数据库表来获取信息。有一个更好的(尽管有点复杂)基于变化数据捕获的方法。进入 Debezium ,这是一个分布式平台,建立在不同数据库中可用的变化数据捕获功能之上。它提供了一套 Kafka Connect连接器 ,可以挖掘数据库表中的行级变化,并将其转换成事件流,发送到Apache Kafka。一旦变化日志事件进入Kafka,它们就会被所有的下游应用所利用。

这里是本帖中提出的使用案例的高级概述。为了演示的目的,它已经被简化了。

概述

Orders 有关的数据存储在PostgreSQL数据库中,包含诸如订单ID、客户ID、城市、交易金额、时间等信息。这些数据由 PostgreSQL的Debezium连接器 拾取并发送到Kafka主题。一旦数据进入Kafka,另一个(水槽)连接器将它们发送到Azure Data Explorer,允许进一步查询和分析。

端到端解决方案中使用的各个组件如下。

来源和目的地

数据管道可能相当复杂! 本博文提供了一个简化的例子,PostgreSQL数据库将被用作数据源,而大数据分析引擎则作为最终目的地(汇)。这两个组件都在Azure中运行。 Azure Database for PostgreSQL Source 是一个基于开源 Postgres 数据库引擎的关系型数据库服务, Azure Data Explorer Sink 是一个快速、可扩展的数据探索服务,让你收集、存储和分析来自任何不同来源的大量数据,例如网站、应用程序、物联网设备等等。

虽然本博客中使用了Azure PostgreSQL DB,但该说明应该适用于任何Postgres数据库。所以,如果你愿意的话,可以随意使用其他的选项

与本博文相关的代码和配置可以在这个 GitHub仓库中找到

Kafka和Kafka Connect

Apache Kafka和Kafka Connect一起作为一个可扩展的流式数据管道平台--这里的关键组件是源和汇连接器。

用于PostgreSQL的Debezium连接器捕获插入、更新和删除数据库内容的行级变化,并提交给PostgreSQL数据库,生成数据变化事件记录,并将其流向Kafka主题。在幕后,它使用一个Postgres输出插件(如 wal2jsonpgoutput 等)和(Java)连接器本身的组合,使用 PostgreSQL的流式复制协议 JDBC驱动程序 读取输出插件产生的变化。

Azure Data Explorer水槽连接器 从配置的Kafka主题中拾取数据,分批将它们发送到Azure Data Explorer,在那里排队摄取,最终写入Azure Data Explorer中的表格。该连接器利用 Azure Data Explorer的Java SDK

大多数组件(除了Azure Data Explorer和Azure PostgreSQL DB)都是作为Docker容器(使用Docker Compose)运行的--Kafka(和Zookeeper)、Kafka Connect工作者和数据生成器应用程序。尽管如此,只要所有的组件都被配置成可以按要求相互访问和通信,该说明就可以适用于任何Kafka集群和Kafka Connect工作者。例如,你可以在 Azure HD Insight Azure Marketplace 上的Confluent Cloud上有一个Kafka集群。

如果你对这些场景感兴趣,请查看这些 实践实验室

以下是组件及其服务定义的分类 - 你可以参考GitHub repo中的完整 docker-compose 文件

Docker Compose Services

   zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092    


Kafka和Zookeeper使用 debezium 镜像运行 - 它们只是工作,对于具有快速反馈循环的迭代开发和演示等非常好。

   dataexplorer-connector:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8080:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=adx_connect_configs
      - OFFSET_STORAGE_TOPIC=adx_connect_offsets
      - STATUS_STORAGE_TOPIC=adx_connect_statuses
  postgres-connector:
    image: debezium/connect:1.2
    ports:
      - 9090:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=pg
      - CONFIG_STORAGE_TOPIC=pg_connect_configs
      - OFFSET_STORAGE_TOPIC=pg_connect_offsets
      - STATUS_STORAGE_TOPIC=pg_connect_statuses    


Kafka Connect源和汇连接器作为独立的容器运行,只是为了让你更容易理解和推理--也可以在一个容器中运行这两个连接器。

请注意,PostgreSQL连接器是内置于 debezium/connect 镜像中的,而Azure Data Explorer连接器是使用自定义镜像设置的。 Dockerfile 是相当紧凑的。

   FROM debezium/connect:1.2
WORKDIR $KAFKA_HOME/connect
ARG KUSTO_KAFKA_SINK_VERSION
RUN curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar    


最后, orders-gen 服务只是 Go 应用程序将随机订单数据播入PostgreSQL。你可以参考GitHub repo 中的Dockerfile

 orders-gen。
    build:
      上下文: ./orders-generator
    环境。
      - pg_host=    
      - pg_user=    
      - pg_password=    
      - PG_DB=  



希望你现在已经对架构和所涉及的组件有了合理的理解。在进入实际操作方面之前,你需要处理一些事情。

先决条件

最后,克隆这个GitHub repo。

   git clone https://github.com/abhirockzz/kafka-adx-postgres-cdc-demo
cd kafka-adx-postgres-cdc-demo    


首先,让我们确保你已经设置和配置了Azure Data Explorer和PostgreSQL数据库。

设置和配置Azure Data Explorer

  1. 创建一个Azure Data Explorer集群和一个数据库 - 这个快速入门 将指导你完成这个过程。

  2. 使用下面的 KQL 查询创建一个表( Orders )和映射( OrdersEventMapping )。

   .create table Orders (orderid: string, custid: string, city: string, amount: int, purchase_time: datetime)

.create table Orders ingestion json mapping 'OrdersEventMapping' '[{"column":"orderid","Properties":{"path":"$.orderid"}},{"column":"custid","Properties":{"path":"$.custid"}},{"column":"city","Properties":{"path":"$.city"}},{"column":"amount","Properties":{"path":"$.amount"}},{"column":"purchase_time","Properties":{"path":"$.purchase_time"}}]'    


在摄取过程中,Azure Data Explorer试图通过将等待摄取的小块摄取数据批在一起来优化吞吐量-- IngestionBatching策略 可以用来微调这一过程。另外,为了这个演示的目的,你可以这样更新策略。

  .alter table Orders policy ingestionbatching @'{"MaximumBatchingTimeSpan: "00:00:30", "MaximumNumberOfItems": 500, "MaximumRawDataSizeMB": 1024}'

.show table     .Orders policy ingestionbatching  


详情请参考 IngestionBatching policy命令参考

  1. 创建一个服务委托人,以便连接器能够验证并连接到Azure Data Explorer服务。如果你想使用Azure门户来做这件事,请参考 如何。使用门户来创建可以访问资源的Azure AD应用和服务委托人 。下面的例子使用了Azure CLI az ad sp create-for-rbac 命令。例如,要创建一个名称为 adx-sp 的服务委托人:
   az ad sp create-for-rbac -n "adx-sp"    


你会得到一个JSON响应。

   {
  "appId": "fe7280c7-5705-4789-b17f-71a472340429",
  "displayName": "kusto-sp",
  "name": "http://kusto-sp",
  "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
  "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
}    


请记下 appIdpasswordtenant ,因为你将在后续步骤中使用它们

  1. 为你的数据库添加权限

为你刚刚创建的服务委托人提供适当的角色。要分配 admin 角色, 按照本指南 使用Azure门户或在你的Data Explorer集群中使用以下命令

  .add database     admins ('adapp=    ;    ') 'AAD App'  


Setup and Configure Azure PostgreSQL DB

你可以使用各种选项在Azure上设置PostgreSQL,包括: Azure Portal , Azure CLI , Azure PowerShell , ARM模板 。一旦你完成了这些工作,你就可以使用你最喜欢的编程语言轻松地连接到数据库,如 Java .NET Node.js Python Go 等等。

尽管上述参考资料是针对单服务器部署模式的,但请注意, Hyperscale(Citus)是另一种部署模式,你可以使用 "接近--或已经超过--100GB数据的工作负荷"。

请确保你随时准备好以下与PostgreSQL有关的信息,因为你将需要它们在随后的章节中配置Debezium连接器--数据库主机名(和端口)、用户名、密码

为了使端到端解决方案按预期工作,我们需要。

  • 确保Azure中的PostgreSQL实例可以从本地Kafka Connect工作者(容器)中访问
  • 确保适当的PostrgeSQL复制设置("逻辑")
  • 创建 Orders 表,你将用它来尝试变化数据捕获功能

如果你使用Azure DB的PostgreSQL,使用 az postgres server firewall-rule create 命令创建一个防火墙规则,将你的主机列入白名单。由于我们在Docker本地运行Kafka Connect,只需导航到Azure门户( 我的PostrgreSQL实例的连接安全 部分),并选择 添加当前客户端IP地址 ,以确保你的本地IP被添加到防火墙规则中。

要改变Azure DB for PostgreSQL的复制模式,你可以使用 az postgres server configuration 命令。

  az postgres server configuration set --resource-group     --server-name     --name azure.replication_support --value logical  


.或者使用Azure Portal中PostgreSQL实例的 Replication 菜单。

更新配置后,你需要重新启动服务器,你可以使用CLI( az postgres server restart )或门户来完成。

一旦数据库启动并运行,创建表。在这个例子中,我使用了 psql CLI,但也可以自由地使用任何其他工具。例如,通过SSL连接到你在Azure上的PostgreSQL数据库(会提示你输入密码)。

  psql -h     .postgres.database.azure.com -p 5432 -U     -W -d     --set=sslmode=require

//举例说明
psql -h my-pgsql.postgres.database.azure.com -p 5432 -U foo@my-pgsql -W -d postgres --set=sslmode=require  


使用下面的SQL来创建表。

   CREATE SCHEMA retail;

CREATE TABLE retail.orders_info (
    orderid SERIAL NOT NULL PRIMARY KEY,
    custid INTEGER NOT NULL,
    amount INTEGER NOT NULL,
    city VARCHAR(255) NOT NULL,
    purchase_time VARCHAR(20) NOT NULL
);    


purchase_time 捕获了执行购买的时间,但它使用 VARCHAR 而不是 TIMESTAMP 类型(理想情况下)来减少整体复杂性。这是因为 Debezium Postgres连接器处理TIMESTAMP数据类型的方式 (也是正确的!)

在接下来的几节中,你将设置源(PostgreSQL)、汇(Azure Data Explorer)连接器,并验证端到端的管道。

启动Docker容器

启动我们的本地环境非常容易,这要感谢Docker Compose--我们所需要的只是一条命令。

   docker-compose --project-name adx-kafka-cdc up --build    


这将建立(并启动)订单生成器应用容器以及Kafka、Zookeeper和Kafka Connect工作者。

下载和启动容器可能需要一些时间:这只是一个一次性的过程。

确认所有的容器是否已经启动。

   docker-compose -p adx-kafka-cdc ps


//output

                 Name                              Command             State                      Ports                   
--------------------------------------------------------------------------------------------------------------------------
adx-kafka-cdc_dataexplorer-connector_1   /docker-entrypoint.sh start   Up      0.0.0.0:8080->8083/tcp, 8778/tcp, 9092/tcp,
                                                                               9779/tcp                                   
adx-kafka-cdc_kafka_1                    /docker-entrypoint.sh start   Up      8778/tcp, 0.0.0.0:9092->9092/tcp, 9779/tcp 
adx-kafka-cdc_orders-gen_1               /orders-gen                   Up                                                 
adx-kafka-cdc_postgres-connector_1       /docker-entrypoint.sh start   Up      0.0.0.0:9090->8083/tcp, 8778/tcp, 9092/tcp,
                                                                               9779/tcp                                   
adx-kafka-cdc_zookeeper_1                /docker-entrypoint.sh start   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp,
                                                                               8778/tcp, 9779/tcp    


订单生成器应用程序将开始向PostgreSQL中的 orders_info 表插入随机订单事件。在这一点上,你也可以做快速的理智检查,以确认订单信息是否被持久化 - 我在下面的例子中使用了 psql

  psql -h     .postgres.database.azure.com -p 5432 -U     -W -d     --set=sslmode=require

select * from retail.orders_info order by orderid desc limit 5;  


这将给你五个最新的订单。

   orderid | custid | amount |   city    |    purchase_time    
---------+--------+--------+-----------+---------------------
      10 |     77 |    140 | Seattle   | 2020-10-09 07:10:49
      9  |    541 |    186 | Cleveland | 2020-10-09 07:10:46
      8  |    533 |    116 | Cleveland | 2020-10-09 07:10:42
      7  |    225 |    147 | Chicago   | 2020-10-09 07:10:39
      6  |    819 |    184 | Austin    | 2020-10-09 07:10:36
(5 rows)    


为了把 orders 数据流到Kafka,我们需要配置并启动Debezium PostgreSQL源连接器的实例。

Debezium PostgreSQL源连接器的设置

将下面的JSON内容复制到一个文件中(你可以给它命名 pg-source-config.json )。请确保你用与你的PostgreSQL实例相对应的值来更新以下属性。 database.hostname , database.user , database.password .

  {
    "name": "pg-orders-source"。
    "配置"。{
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector"。
        "数据库.主机名": "    .postgres.database.azure.com"。
        "数据库.端口"。"5432",
        "数据库.用户"。"    @    ",
        "数据库.密码"。"    ",
        "database.dbname": "postgres"。
        "数据库.服务器.名称"。"myserver"。
        "plugin.name": "wal2json"。
        "table.whiteelist": " retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}  


在撰写本文时,Debezium支持以下插件。 decoderbufs , wal2json , wal2json_rds , wal2json_streaming , wal2json_rds_streamingpgoutput 。我在这个例子中使用了 wal2json ,它在Azure上也是 s 支持的

要启动连接器,只需使用Kafka Connect REST端点来提**置。

   curl -X POST -H "Content-Type: application/json" --data @pg-source-config.json http://localhost:9090/connectors

# to confirm
curl http://localhost:9090/connectors/pg-orders-source    

注意REST端点的端口是 9090 --这是按 docker-compose.yaml 中定义的服务端口映射

让我们偷看一下Kafka主题,看看源连接器产生的变化数据捕获事件。

   docker exec -it adx-kafka-cdc_kafka_1 bash    


你将被放入一个shell(容器内)。执行下面的命令,从Kafka获取变化数据事件。

   cd bin && ./kafka-console-consumer.sh --topic myserver.retail.orders_info --bootstrap-server kafka:9092 --from-beginning    


注意主题名称 myserver.retail.orders_info 是Debezium连接器使用的惯例

主题中的每个事件都对应着一个特定的顺序。它的格式是JSON,看起来像下面描述的那样。请注意,有效载荷还包含整个 schema ,为简洁起见,已将其删除。

   {
    "schema": {....},
    "payload": {
        "before": null,
        "after": {
            "orderid": 51,
            "custid": 306,
            "amount": 183,
            "city": "Austin",
            "purchase_time":"2020-10-09 07:23:10"
        },
        "source": {
            "version": "1.2.1.Final",
            "connector": "postgresql",
            "name": "myserver",
            "ts_ms": 1602057392691,
            "snapshot": "false",
            "db": "postgres",
            "schema": "retail",
            "table": "orders_info",
            "txId": 653,
            "lsn": 34220200,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1602057392818,
        "transaction": null
    }
}    


到目前为止,我们有了我们管道的前半部分。让我们开始第二部分的工作吧!

Azure Data Explorer Sink Connector设置

将下面的JSON内容复制到一个文件中(可以命名为 adx-sink-config.json )。根据你的Azure数据资源管理器的设置,替换以下属性的值-- aad.auth.authorityaad.auth.appidaad.auth.appkeykusto.tables.topics.mapping (数据库名称)和 kusto.url

  {
    "名称"。"adx-orders-sink"。
    "配置"。{
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector"。
        "flush.size.bytes": 10000,
        "flush.interval.ms": 30000,
        "tasks.max": 2,
        "主题": "myserver. retail.orders_info",
        "kusto.tables.topics.mapping"。"[{'主题': 'myserver. retail.orders_info', 'db': '    ', 'table': 'Orders', 'format': 'json', 'mapping': 'OrdersEventMapping'}]"。
        "aad.auth.authority": "    ",
        "kusto.url": "https://ingest-    .    .kusto.windows.net"。
        "aad.auth.appid": "    ",
        "aad.auth.appkey": "    ",
        "key. converter": "org.apache.kafka.connect.storage.StringConverter"。
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
}  


注意这里使用了Kafka Connect Single Message Transformation (SMT) - 这是Debezium提供的 ExtractNewRecordState 转换。你可以在文档中阅读它

  "transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"   


它从JSON有效载荷中删除 schema 和其他部分,只保留需要的部分。在这种情况下,我们所要寻找的是 after 属性中的订单信息(在有效载荷中)。例如:

   {
    "orderid": 51,
    "custid": 306,
    "amount": 183,
    "city": "Austin",
    "purchase_time":"2020-10-09 07:23:10"
}    


当然,你可以用不同的方式建模(在源连接器本身应用转换),但这种方法有几个好处。

  1. 只有相关的数据被发送到Azure Data Explorer
  2. Kafka主题包含 整个 变化数据事件(连同模式),可以被任何下游服务利用

要安装连接器,只需像以前一样使用Kafka Connect REST端点。

   curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8080/connectors

# check status
curl http://localhost:8080/connectors/adx-orders-sink/status    


注意REST端点的端口是 8080 --这是在 docker-compose.yaml

中定义的每个服务端口映射

连接器应该旋转起来,对Azure Data Explorer进行认证,并开始批处理摄入过程。

注意, flush.size.bytesflush.interval.ms 是用来调节批处理过程的。请参考 连接器文档 ,了解各个属性的细节。

由于连接器的冲洗配置和Azure Data Explorer中 Orders 表的批处理策略相当激进(用于演示),你应该看到数据迅速流入Data Explorer。



查询Azure数据资源管理器

你可以在数据资源管理器中查询订单表,对数据进行切片和切块。这里有几个简单的查询,可以先用。

获取来自纽约市的订单的详细信息;

   Orders
| where city == 'New York'    

只获取来自纽约市的订单的购买金额和时间,按金额排序

   Orders
| where city == 'New York'
| project amount, purchase_time
| sort by amount    

找出每个城市的平均销售额,并以列图表示。

   Orders
| summarize avg_sales = avg(amount) by city 
| render columnchart    

每个城市的采购总额,以饼状图表示。

   Orders 
| summarize total = sum(amount) by city 
| sort by total
| render piechart    

每个城市的订单数量,用线图表示。

   Orders
| summarize orders = count() by city
| sort by orders
| render linechart    

一天内的购买量是如何变化的?

   Orders
| extend hour = floor(purchase_time % 1d , 10m)
| summarize event_count=count() by hour
| sort by hour asc
| render timechart    

不同城市一天内的变化如何?

   Orders
| extend hour= floor( purchase_time % 1d , 10m)
| where city in ("New Delhi", "Seattle", "New York", "Austin", "Chicago", "Cleveland")
| summarize event_count=count() by hour, city
| render columnchart    

Azure数据资源管理器仪表板

清理

要停止容器,你可以。

   docker-compose -p adx-kafka-cdc down -v    


要删除Azure Data Explorer集群/数据库,使用 az集群删除 az kusto数据库删除 。对于PostgreSQL,只要使用 az postgres server delete

  az postgres server delete -g     -n     
az kusto cluster delete -n     -g     
az kusto database delete -n     --cluster-name     -g   



结论

Kafka Connect帮助你建立可扩展的数据管道,而不需要写自定义的管道代码。你主要需要设置、配置,当然还有操作连接器。记住,Kafka Connect工作实例只是JVM进程,根据你的规模和要求,你可以使用选择使用 Azure Kubernetes服务 来操作它们。由于Kafka Connect实例是无状态的实体,你在集群工作负载的拓扑结构和大小方面有很大的自由度!

其他资源

如果你想进一步探索,我建议