使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线

作者:MeshCloud脉时云公有云架构师 于文宝

引言:

产品优势:

快速进行流式数据分析

Dataflow 可实现快速、简化的流式数据流水线开发,且数据延迟时间更短。

简化运营和管理

Dataflow 的无服务器方法消除了数据工程工作负载的运营开销,让团队可以专注于编程,而不必管理服务器集群。

降低总体拥有成本

资源自动扩缩功能搭配费用优化的批处理功能,使得 Dataflow 可提供几乎无限的容量来管理季节性和峰值工作负载,而不会让您过度开支。

目标事件:

Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表格中

在您的 Google Cloud 项目中启用 Dataflow 和其他所需的 API。

使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线

创建对象存储痛

使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线

创建 BigQuery 数据集

  1. 对于数据集 ID,输入 taxirides。
  2. 对于数据位置,选择 us (multiple regions in the United States)(美国 [美国的多个区域])
使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线

创建 BigQuery 表

  1. 点击 taxirides 数据集旁边的 查看操作,然后点击打开。
  2. 在详细信息面板中,点击创建表
  3. 架构部分,点击以文本形式修改切换开关 已文本形式修改字段ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
  4. 在 Partitioning and cluster settings(分区和集群设置)部分中,对于分区,选择时间戳字段
使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线
使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线

运行流水线

  1. 前往 Dataflow 作业页面。
  2. 点击基于模板创建作业。
  3. 输入 taxi-data 作为 Dataflow 作业的名称
  4. 对于 Dataflow 模板,选择 Pub/Sub Topic to BigQuery 模板。
  5. 创建topic projects/pubsub-public-data/topics/taxirides-realtime对于此公开提供的 Pub/Sub 主题基于纽约市出租车和豪华轿车委员会的开放数据集。以下是来自此主题的 JSON 格式的示例消息:
{
  "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
  "point_idx": 217,
  "latitude": 40.75399,
  "longitude": -73.96302,
  "timestamp": "2021-03-08T02:29:09.66644-05:00",
  "meter_reading": 6.293821,
  "meter_increment": 0.029003782,
  "ride_status": "enroute",
  "passenger_count": 1
}
  1. BigQuery 输出表,输入以下内容mec-test-344202:taxirides.realtime
  2. 对于临时位置,输入以下内容 gs://BUCKET_NAME/temp/将 BUCKET_NAME 替换为您的 Cloud Storage 存储桶的名称
  3. 命令行创建:
gcloud dataflow jobs run taxi-data --gcs-location gs://dataflow-templates-us-central1/latest/PubSub_to_BigQuery --region us-central1 --staging-location gs://hadoop-hive/test/ --parameters inputTopic=projects/pubsub-public-data/topics/taxirides-realtime,outputTableSpec=mec-test-344202:taxirides.realtime
使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线

执行过程如图:

使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线

查看结果

  1. 前往 BigQuery。去查询结果
  2. 查询sql :SELECT * FROM `mec-test-344202.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
使用 Dataflow 模板创建从 Pub/Sub 到 BigQuery 的流处理流水线

问题补充:

BigQuery 创建表需要提前创建表的字段和分区,以时间时间戳查询结果

发表评论

您的电子邮箱地址不会被公开。