Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

作者:MeshCloud脉时云 公有云架构师 于文宝

背景

对于拥有许多独立数据源的企业而言,访问整个组织内的企业数据(尤其是实时访问)并非易事。这会导致数据访问受限且速度缓慢,因而造成组织无法进行检查。

Datastream 提供近乎实时的访问权限,让您能够更改各种本地和云端数据源中的数据以创建组织数据访问权限。Datastream 提供简单的设置体验和统一的使用 API,让组织普遍能够访问组织内可用的最新企业数据,从而为集成式近乎实时的场景提供支持。

其中一种场景是,将数据从源数据库转移到云端存储服务或消息传递队列,然后将这些数据转换为可供与该存储服务或消息传递队列通信的其他应用和服务(例如 Dataflow)读取的形式。Dataflow 是一项用于在 Google Cloud 上捕获和处理数据的 Web 服务。

在本教程中,您将了解 Datastream 如何通过简化的 Dataflow 模板与 Dataflow 无缝集成,从而在 BigQuery 中实现最新具体化视图以执行分析。

您将了解如何使用 Dataflow 将更改(插入、更新或删除的数据)从源 MySQL 数据库流式传输到 Cloud Storage 存储桶中的文件夹。

您将配置 Cloud Storage 存储桶以发送通知,供 Dataflow 了解包含 Datastream 从源数据库流式传输的数据更改的任何新文件。然后,Dataflow 作业将处理这些文件并将更改转移到 BigQuery。

架构图

Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

准备工作

  1. 启用 Datastream API。
  2. 确保您已为您的用户帐号分配 Datastream Admin 角色。
  3. 确保您有一个 Datastream 可以访问的源数据库。本教程使用 MySQL 数据库作为来源。
  4. 确保源数据库中有数据、表和架构。
  5. 配置源数据库以允许来自 Datastream 公共 IP 地址的传入连接。如需查看包含所有 Datastream 地区及其关联公共 IP 地址的列表,请参阅 IP 许可名单和地区。
  6. 为源数据库设置变更数据捕获 (CDC)。如需了解详情,请参阅配置源 MySQL 数据库。
  7. 确保您已配置 Datastream 可以访问的目标 Cloud Storage 存储桶。
  8. 确保您满足所有前提条件,以便为 Cloud Storage 启用 Pub/Sub 通知。
  9. 您将在 Cloud Storage 中创建目标存储桶,并为存储桶启用 Pub/Sub 通知。这样设置后,Dataflow 就可以接收通知来了解 Datastream 写入存储桶的新文件。这些文件包含 Dataflow 从源数据库流式传输到存储桶的数据更改。

要求

Datastream 提供各种来源选项、目标选项和网络连接方法。

在本教程中,假设您使用独立的 MySQL 数据库和目标 Cloud Storage 服务。对于源数据库,您应该能够将网络配置为添加入站防火墙规则。源数据库可以位于本地,也可以位于云提供商。对于 Cloud Storage 目标位置,无需配置连接。

由于我们无法获知您环境的具体细节,因此我们无法提供网络配置的详细步骤。

在本教程中,您将选择 IP 许可名单作为网络连接方法。IP 许可名单是一项安全功能,通常用于仅限受信任的用户访问您的源数据库中的数据并对这些访问进行控制。您可以使用 IP 许可名单创建受信任的 IP 地址或 IP 地址范围列表,您的用户和其他 Google Cloud 服务(如 Datastream)可通过这些地址访问此数据。要使用 IP 许可名单,您必须向来自 Datastream 的传入连接开放源数据库或防火墙。

在 Cloud Storage 中创建存储桶

在本部分中,您将在 Cloud Storage 中创建存储桶。DataStream 将架构、表和数据从源 MySQL 数据库流式传输到的目标存储桶。

  1. 转到 Google Cloud Console 中 Cloud Storage 的浏览器页面。
  2. 点击创建存储桶。此时将显示创建存储桶页面。
  3. 为存储桶命名区域的文本字段中,输入 my-integration-bucket-001,然后点击继续
  4. 对于页面每个剩余区域,请接受默认设置。为此,请点击每个区域底部的继续
  5. 点击创建
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

为 Cloud Storage 存储桶启用 Pub/Sub 通知

在本部分中,您将为创建的 Cloud Storage 存储桶启用 Pub/Sub 通知。这样,您就可以配置存储桶来发送通知,供 Dataflow 用来了解 Datastream 写入该存储桶的任何新文件。这些文件包含 Datastream 从源 MySQL 数据库流式传输到存储桶的数据的更改。

在为存储桶启用通知之前,请确保满足所有前提条件。

  1. 访问您创建的 Cloud Storage 存储桶。此时将显示存储桶详情页面。
  2. 点击页面右上角的激活 Cloud Shell 按钮。
  3. 在提示符处,输入以下命令:
  4. gsutil notification create -t my_integration_notifs -f json gs://my-integration-bucket-001
  5. 通过输入此命令,您将在 Pub/Sub 中创建 my_integration_notifs 主题。本主题将配置为向 Dataflow 发送通知,供其了解 Datastream 流式传输到 Cloud Storage 存储桶 (my-integration-bucket-001) 的数据的更改。
  6. 对此数据所做的任何更改都将捕获到 Pub/Sub 主题中。此主题的任何订阅者(例如 Dataflow)都可以获取此信息。
  7. 可选。如果看到授权 Cloud Shell 窗口,请点击授权
  8. 验证您是否看到以下代码行:
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery
  1. project-name 是您的 Google Cloud 项目名称的占位符。
  2. 转到 Google Cloud Console 中 Pub/Sub 的主题页面。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery
  1. 点击在此过程中创建的 my_integration_notifs 主题。
  2. 在 my_integration_notifs 页面,滚动到页面底部。订阅标签页处于活跃状态。此外,会出现没有可显示的订阅消息。
  3. 您将为 my_integration_notifs 主题创建订阅。订阅此订阅的应用(例如 Dataflow)可以获取主题中的信息。此信息与 Datastream 流式传输到 Cloud Storage 存储桶的源数据库数据更改相关联。
  4. 点击创建订阅,然后从显示的下拉菜单中选择创建订阅项。
  5. 填充向主题添加订阅页面,如下所示:
  1. 订阅 ID 字段中,输入订阅的 ID。在本教程中,在字段中输入 my_integration_notifs_sub。
  2. 保留页面上的所有其他默认值。
  3. 点击创建
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

在本教程后面部分,您将创建 Dataflow 作业。在创建此作业时,您需要将 Dataflow 指定为 my_integration_notifs_sub 订阅的订阅者。指定后,Dataflow 可以接收通知来了解 Datastream 写入 Cloud Storage 的新文件、处理这些文件并将数据更改转移到 BigQuery。

在 BigQuery 中创建数据集

在本部分中,您将在 BigQuery 中创建数据集。BigQuery 使用数据集来包含它从 Dataflow 接收的数据。此数据表示 Datastream 将流式传输到 Cloud Storage 存储桶的源 MySQL 数据库中的更改。

  1. 转到 Google Cloud Console 中 BigQuery 的 SQL 工作区页面。
  2. 探索器窗格中,点击您的 Google Cloud 项目名称右侧的查看操作按钮。该按钮形似垂直省略号。
  3. 从显示的下拉菜单中选择创建数据集
  4. 填充创建数据集窗口,如下所示:
  1. 数据集 ID 字段中,输入数据集的 ID。在本教程中,在字段中输入 My_integration_dataset_log。
  2. 保留窗口中的所有其他默认值。
  3. 点击创建数据集
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery
  1. Dataflow 将使用 My_integration_dataset_log 数据集来暂存从 Datastream 接收的数据更改。
  2. 探索器窗格中,点击 Google Cloud 项目名称左侧的节点图标,并验证您能看到自己创建的数据集。
  3. 按照此过程中的步骤创建第二个数据集:My_integration_dataset_final
  4. 在 My_integration_dataset_final 数据集中,将合并在 My_integration_dataset_log 数据集中暂存的更改,以在源数据库中创建表的一对一副本。
  5. 展开每个数据集左侧的节点。
  6. 验证每个数据集是否为空。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

在 Datastream 将数据更改从源数据库流式传输到 Cloud Storage 存储桶后,Dataflow 作业将处理包含更改的文件,并将更改转移到 BigQuery 数据集。

在 Datastream 中创建连接配置文件

在本部分中,您将在 Datastream 中为源数据库和目标位置创建连接配置文件。在创建连接配置文件时,您需要选择 MySQL 作为来源连接配置文件的类型,选择 Cloud Storage 作为目标连接配置文件的类型。

Dataflow 使用连接配置文件中定义的信息同时连接到来源和目标位置,以便可将数据从源数据库流式传输到 Cloud Storage 中的目标存储桶。

为 MySQL 数据库创建来源连接配置文件

  1. 转到 Google Cloud Console 中 Datastream 的连接配置文件页面。
  2. 点击创建配置文件
  3. 创建连接配置文件页面中,点击 MySQL 配置文件类型(因为您希望为 MySQL 数据库创建来源连接配置文件)。
  4. 创建 MySQL 配置文件页面的定义连接设置部分中提供以下信息:
  • 输入 My Source Connection Profile 作为源数据库的连接配置文件名称
  • 保留自动生成的连接配置文件 ID
  • 选择用于存储连接配置文件的地区
  • 与所有资源一样,连接配置文件也保存在地区中,并且数据流只能使用与该数据流存储在同一地区的连接配置文件。地区选择不会影响 Datastream 是否可连接到来源或目标位置,但如果该地区发生停机,则可能会影响可用性。
  • 输入连接详情
  • 主机名或 IP 字段中,输入 Datastream 可用于连接到源数据库的主机名或公共 IP 地址。您将提供公共 IP 地址,因为 IP 许可名单将用作本教程的网络连接方法。
  • 端口字段中,输入为源数据库预留的端口号。对于 MySQL 数据库,默认端口通常为 3306。
  • 输入用户名密码,对源数据库进行身份验证。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery
  1. 定义连接设置部分中,点击继续创建 MySQL 配置文件页面的确保与源的连接安全部分处于活跃状态。
  2. 加密类型菜单中,选择。如需详细了解此菜单,请参阅为 MySQL 数据库创建连接配置文件。
  3. 确保与源的连接安全部分中,点击继续创建 MySQL 配置文件页面的定义连接方法部分处于活跃状态。
  4. 选择您要在源数据库与 Cloud Storage 中的目标存储桶之间建立连接时使用的网络方法。在本教程中,请使用连接方法下拉列表选择 IP 许可名单作为网络方法。
  5. 配置源数据库以允许来自显示的 Datastream 公共 IP 地址的传入连接。
  6. 定义连接方法部分中,点击继续创建 MySQL 配置文件页面的测试连接配置文件部分处于活跃状态。
  7. 点击运行测试,验证源数据库和 DataStream 是否可以相互通信。
  8. 验证您是否看到“已通过测试”状态。
  9. 如果测试失败,您可以在流的适当部分解决问题,然后重新测试。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery
  1. 点击创建

为 Cloud Storage 创建目标连接配置文件

  1. 转到 Google Cloud Console 中 Datastream 的连接配置文件页面。
  2. 点击创建配置文件
  3. 创建连接配置文件页面中,点击 Cloud Storage 配置文件类型(因为您希望为 Cloud Storage 创建目标连接配置文件)。
  4. 创建 Cloud Storage 配置文件页面中提供以下信息:
  • 输入 My Destination Connection Profile 作为目标 Cloud Storage 服务的连接配置文件名称
  • 保留自动生成的连接配置文件 ID
  • 选择用于存储连接配置文件的地区
  • 连接详情窗格中,点击浏览以选择您在本教程前面部分创建的 my-integration-bucket-001。Datastream 会将数据从源数据库转移到该存储桶中。完成选择后,点击选择
  • 您的存储桶会显示在连接详情窗格的存储桶名称字段中。
  • 连接配置文件路径前缀字段中,提供当 Datastream 将数据流式传输到目标位置时要追加到存储桶名称的路径的前缀。在本教程中,在字段中输入 /integration/tutorial。
  • 注意:您在此字段中输入的任何路径都必须以斜杠 (/) 开头。
  1. 点击创建
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

为 MySQL 数据库创建来源连接配置文件,并为 Cloud Storage 创建目标连接配置文件后,您可以使用它们来创建数据流。

在 Datastream 中创建数据流

在本部分中,您将创建一个数据流。此数据流会将源 MySQL 数据库中的数据转移到 Cloud Storage 中的目标存储桶。

创建数据流涉及到:

  • 定义数据流的设置。
  • 选择您为源数据库创建的连接配置文件(来源连接配置文件)。在本教程中,我们使用的是我的来源连接配置文件
  • 通过在源数据库中指定 Datastream 对其执行以下操作的表和架构,配置数据流的源数据库的相关信息:
  • 可以转移到目标位置。
  • 无法转移到目标位置。
  • 确定 Datastream 是回填历史数据并将进行中的更改流式传输到目标位置,还是仅流式传输对数据的更改。
  • 选择您为 Cloud Storage 创建的连接配置文件(目标连接配置文件)。在本教程中,我们使用的是我的目标连接配置文件
  • 配置有关数据流的目标存储桶的信息。此类信息包括:
  • DataStream 将架构、表和数据从来源数据库转移到的目标存储桶的文件夹。
  • 写入 Cloud Storage 的文件的输出格式。Datastream 目前支持两种输出格式:Avro 和 JSON。在本教程中,使用 Avro 文件格式。

Dataflow 配备了向导,可帮助您创建数据流。此向导包含六个面板:使用入门、定义和测试来源、配置来源、定义目标位置、配置目标位置以及审核并创建。本页面的各个部分介绍了如何填充每个面板。

定义数据流的设置

  1. 转到 Google Cloud Console 中数据流页面。
  2. 创建数据流页面的定义数据流详情面板中提供以下信息:
  • 输入 My Stream 作为数据流名称
  • 保留自动生成的数据流 ID
  • 地区菜单中,选择您在其中创建了来源和目标连接配置文件的地区。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery
  • 来源类型菜单中,选择 MySQL 配置文件类型。
  • 目标类型菜单中,选择 Cloud Storage 配置文件类型。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

选择目标连接配置文件

  1. 目标连接配置文件菜单中,选择 Cloud Storage 的目标连接配置文件。
  2. 点击继续。系统会显示创建数据流页面的配置数据流目标位置面板。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

配置有关数据流目标位置的信息

  1. 输出格式字段中,选择写入 Cloud Storage 的文件的格式。在本教程中,使用 Avro 文件格式。
  2. 数据流路径前缀字段中,您可以提供一个在 Datastream 将数据转移到目标位置时要追加到存储桶名称中的路径的前缀。这是您的 Cloud Storage 存储桶的路径,DataStream 会将架构、表和数据从源 MySQL 数据库转移到该存储桶中。
  3. 您在为 Cloud Storage 创建目标连接配置文件时,已提供 /integration/tutorial 路径。因此,您无需填充此字段。
  4. 点击继续。系统会显示创建数据流页面的审核数据流详情并创建面板。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

创建数据流

  1. 验证数据流的详细信息,以及数据流用于将数据从源 MySQL 数据库转移到 Cloud Storage 中目标存储桶的来源和目标连接配置文件。
  2. 点击运行验证来验证数据流。通过验证数据流,Datastream 会检查来源是否配置正确,验证数据流是否可以连接到来源和目标位置,并验证数据流的端到端配置。
  3. 如果验证检查通过,则验证检查的左侧会显示一个对勾标记图标。
  4. 如果验证检查未通过,则检查的左侧会显示一个英文感叹号图标,并且检查下方会显示查看错误详情按钮。点击该按钮后,系统会显示一个弹出式窗口,其中说明了检查未通过的原因以及如何纠正问题。进行适当的更正后,请点击重新验证
  5. 如需详细了解如何排查未通过的验证检查,请参阅诊断问题。
  6. 所有验证检查都通过后,点击创建
  7. 创建数据流?对话框中,点击创建

创建数据流后,您可以启动它。

Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

启动数据流

在本教程的上一部分中,您创建了一个数据流,但并未启动它。您现在可以进行启动。

在本教程中,您将分别创建和启动数据流,以防数据流创建过程造成源数据库负载增加。如需消除该负载,您需要创建数据流但不启动它,然后在可产生负载时启动数据流。

通过启动数据流,Datastream 可以将数据、架构和表从源数据库转移到目标位置。

  1. 选择要启动的数据流左侧的复选框。在本教程中,这是我的数据流
  2. 点击启动
  3. 在对话框中,点击启动。数据流的状态从 Not started 更改为 Starting,再更改为 Running。
  4. 数据流大约需要 30 秒才能开始运行。需要启动后台资源,然后才能启动数据流。

启动数据流后,您可以验证 Dataflow 是否将数据从源数据库转移到目标位置。

验证数据流

在本部分中,您将确认 Dataflow 将数据从源 MySQL 数据库的所有表转移到 Cloud Storage 目标存储桶的 /integration/tutorial 文件夹中。在本教程中,您的存储桶的名称为 my-integration-bucket-001页面。

  1. 您点击的链接由存储桶的名称以及 Datastream 将架构、表和数据从源数据库转移到的存储桶的文件夹组成。在为 Cloud Storage 创建目标连接配置文件时,您已将此文件夹指定为 /integration/tutorial。因此,链接应显示为 my-integration-bucket-001/integration/tutorial
  2. 验证您是否看到表示源数据库的表的文件夹。
  3. 点击其中一个表文件夹并展开细目,直到您看到与该表关联的数据。
  4. 第一个文件夹是 [schema]_[table],后跟文件夹表示 Datastream 将数据从源数据库转移到 Cloud Storage 中的目标存储桶的具体时间(年、月、日、小时和分钟)。
  5. 每分钟创建一个文件夹(当要写入新数据时)。
  6. 当文件大小达到 250 MB 或架构发生更改时,系统将创建一个新文件。如果表已分区,则系统会为每个分区创建文件。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

创建 Dataflow 作业

在本部分中,您将在 Dataflow 中创建作业。在 Dataflow 将源 MySQL 数据库中的数据更改流式传输到 Cloud Storage 存储桶后,系统会向 Dataflow 发送包含更改的新文件的通知。Dataflow 作业会处理这些文件并将更改转移到 BigQuery。

  1. 点击从模板创建作业
  2. 从模板创建作业页面的作业名称字段中,输入您要创建的 Dataflow 作业的名称。在本教程中,在字段中输入 my-dataflow-integration-job。
  3. 地区端点菜单中,选择存储作业的地区。此地区就是您为所创建的来源连接配置文件、目标连接配置文件和数据流选择的地区。
  4. 从 Dataflow 模板菜单中,选择要用于创建作业的模板。在本教程中,请选择 Datastream to BigQuery
  5. 进行选择后,将显示与此模板相关的其他字段。
  6. Datastream to BigQuery 模板是一种流处理流水线,可读取 Datastream 数据并将其复制到 BigQuery 中。该模板使用 Pub/Sub 通知从 Cloud Storage 中读取数据,并将其复制到时间分区的 BigQuery 暂存表中。复制后,该模板会在 BigQuery 中执行 MERGE,将所有变更数据捕获获 (CDC) 更改插入/更新到源表的副本中。
  7. 为了最大限度降低频繁 MERGE 操作产生的费用,我们建议首先采用初始频率,即每 6-12 小时操作一次。完成所有回填且顺利复制数据后,请将此值降低至所需的频率。
  8. 如需详细了解 Datastream to BigQuery 模板,请参阅 Datastream to BigQuery (Stream)。
  9. 在 Cloud Storage 中 Datastream 文件输出的文件位置字段中,输入包含 Cloud Storage 存储桶的名称的路径。在本教程中,在字段中输入 gs://my-integration-bucket-001。
  10. 在 Cloud Storage 通知政策中使用的 Pub/Sub 订阅字段中,输入包含 Pub/Sub 订阅名称的路径。在本教程中,在字段中输入 projects/project-name/subscriptions/my_integration_notifs_sub。
  11. project-name 是您的 Google Cloud 项目名称的占位符。此外,您在本教程的为 Cloud Storage 存储桶启用 Pub/Sub 通知部分创建了 my_integration_notifs_sub 订阅。
  12. 在 Datastream 输出文件格式 (avro/json) 字段中输入 avro,因为在本教程中,Avro 是 Datastream 写入 Cloud Storage 的文件的格式。
  13. 包含暂存表的数据集的名称或模板字段中输入 my-dataflow-integration-job,因为 Dataflow 将使用此数据集暂存从 Datastream 接收的数据更改。
  14. 包含副本表的数据集的模板字段中输入 My_integration_dataset_final,因为将在此数据集中合并暂存在 My_integration_dataset_log 数据集中的更改,以创建源数据库中表的一对一副本。
  15. 您在本教程的在 BigQuery 中创建数据集部分创建了 My_integration_dataset_log 和 My_integration_dataset_final 数据集。
  16. 死信队列目录字段中,输入包含 Cloud Storage 存储桶的名称和死信队列文件夹的路径。Dataflow 无法转移到 BigQuery 的数据更改都将存储在队列中。您可以修复队列中的内容,以便 Dataflow 可以重新处理它。
  17. 在本教程中,请在死信队列目录字段中输入 gs://my-integration-bucket-001/(其中,dlq 是死信队列的文件夹)。
  18. 点击运行作业

您的 Dataflow 作业可能需要几分钟时间才能运行。需要运行后台资源。此外,作业必须先处理包含从 Cloud Storage 接收的数据更改的文件,然后才能将这些更改转移到 BigQuery 中。

Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

验证集成

在本教程的验证数据流部分中,您已确认 Datastream 将数据从源 MySQL 数据库的所有表转移到 Cloud Storage 目标存储桶的 /integration/tutorial 文件夹中。

在本部分中,您将验证 Dataflow 是否处理包含与此数据关联的文件,以及是否将更改传输到 BigQuery。因此,Datastream 和 BigQuery 之间具有端到端集成。

  1. 转到 Google Cloud Console 中 BigQuery 的 SQL 工作区页面。
  2. 探索器窗格中,展开 Google Cloud 项目名称左侧的节点。
  3. 展开 My_integration_dataset_log 和 My_integration_dataset_final 数据集左侧的节点。
  4. 验证每个数据集现在是否包含数据。这会确认 Dataflow 已处理与 Datastream 流式传输到 Cloud Storage 中的数据相关联的文件,并且已将这些更改转移到 BigQuery 中。
Datastream 和 Dataflow 进行分析MySQL以CDC的方式同步到BigQuery

问题描述:

问题一:在操作中需要把sql binlog 日志开启

问题二:在 Cloud Storage 中 Datastream 文件输出的文件位置精确到文件avro

发表评论

您的电子邮箱地址不会被公开。