《Trino权威指南2》第七章:高级连接器示例

前文引用:
《Trino权威指南2》第一章:Trino介绍
《Trino权威指南2》第二章:安装和配置Trino
《Trino权威指南2》第三章:使用Trino
《Trino权威指南2》第四章:Trino的架构
《Trino权威指南2》第五章:生产就绪部署
《Trino权威指南2》第六章:连接器

现在您已经了解连接器从第 6 章提供给 Trino 的功能以及如何配置它们。让我们将这些知识扩展到一些更复杂的使用场景和连接器。这些通常是需要足够智能的连接器,以将底层数据源的存储模式和思想转换为 SQL 和 Trino 的表导向模型。

通过直接转到关于您想要使用 Trino 连接和使用 SQL 进行查询的系统的部分,进一步了解:

  • “使用 Phoenix 连接到 HBase”
  • “键-值存储连接器示例:Accumulo”
  • “Apache Cassandra 连接器”
  • “流系统连接器示例:Kafka”
  • “文档存储连接器示例:Elasticsearch”

然后,您可以通过学习“Trino 中的查询联邦”中的查询联邦和相关的 ETL 用法来完善您的理解。

与 Phoenix 连接到 HBase

分布式、可扩展的大数据存储 Apache HBase 构建在 HDFS 之上。然而,用户并不局限于使用低级别的 HDFS 并使用 Hive 连接器访问它。Apache Phoenix 项目提供了一个 SQL 层来访问 HBase,而多亏了 Trino Phoenix 连接器,因此您可以像访问任何其他数据源一样从 Trino 访问 HBase 数据库。

和往常一样,您只需一个目录文件,比如 etc/catalog/bigtables.properties:

connector.name=phoenix5
phoenix.connection-url=jdbc:phoenix:zookeeper1,zookeeper2:2181:/hbase

连接 URL 是到数据库的 JDBC 连接字符串。它包括用于发现 HBase 节点的 Apache ZooKeeper 节点列表。

Phoenix 模式和表被映射到 Trino 模式和表,您可以使用常规的 Trino 语句检查它们:

SHOW SCHEMAS FROM bigtable;
SHOW TABLES FROM bigtable.example;
SHOW COLUMNS FROM bigtable.examples.user;

现在,您已经准备好查询任何 HBase 表并在下游工具中使用它们,就像从连接到 Trino 的任何其他数据源获取的数据一样。

使用 Trino 允许您查询 HBase,同时享受水平扩展的 Trino 的性能优势。您创建的任何查询都可以访问 HBase 和任何其他目录,允许您将 HBase 数据与其他源合并到联合查询中。

键值存储连接器示例:Accumulo

Trino 包含多个用于连接键值数据存储的连接器。键值存储是一种通过使用唯一键来存储和检索记录字典的系统。想象一下哈希表,通过键检索记录。这个记录可以是单个值、多个值,甚至是一个集合。

存在许多具有不同功能的键值存储系统。一个广泛使用的系统是开源的、宽列存储数据库 Apache Cassandra,Trino 提供了对其的连接器。您可以在“Apache Cassandra 连接器”中找到更多信息。

我们现在将详细讨论的另一个示例是 Apache Accumulo。它是一个高性能、广泛使用的开源键值存储,可以通过 Trino 连接器进行查询。一般的概念可以转化到其他键值存储。我们使用 Accumulo 连接器作为示例,展示连接器需要完成什么任务,将不同系统的上下文映射到 Trino 的概念。

受 Google 的 BigTable 启发,Apache Accumulo 是一个用于可扩展存储和检索的排序分布式键值存储。Accumulo 将键值数据存储在按键排序的 HDFS 上。

图 7-1 显示了在 Accumulo 中,键由行 ID、列和时间戳三元组组成。首先按行 ID 和列按升序字典顺序排序,然后按时间戳按降序顺序排序。

图片

通过利用列族和局部组,可以进一步优化 Accumulo。对于 Trino 来说,大部分操作都是透明的,但了解 SQL 查询的访问模式可能有助于优化创建 Accumulo 表。这与为使用 Accumulo 的任何其他应用程序优化表是相同的。

让我们看一下表 7-1 中关系表的逻辑表示。

图片

由于 Accumulo 是一个键值存储,它将数据的这种表示方式与逻辑视图不同地存储在磁盘上,如表 7-2 所示。这种非关系存储使得确定 Trino 如何从中读取数据变得不那么直观。

图片

Trino Accumulo 连接器负责将 Accumulo 数据模型映射为 Trino 能够理解的关系模型。 图 7-2 显示 Accumulo 使用 HDFS 进行存储,并使用 ZooKeeper 管理有关表的元数据。

图片

从根本上说,Accumulo 是一个由主节点和多个分片服务器组成的分布式系统,如图 7-3 所示。分片服务器包含并公开分片,这是表的水平分区片段。客户端直接连接到分片服务器以扫描所需的数据。

图片

就像 Accumulo 本身一样,Trino Accumulo 连接器也使用 ZooKeeper。它从 Accumulo 使用的 ZooKeeper 实例中读取所有信息,例如表、视图、表属性和列定义。

让我们看看如何从 Trino 中的 Accumulo 中扫描数据。在 Accumulo 中,可以使用 Scanner 对象从表中读取键值对。扫描器从表的特定键开始读取,并在另一个键或表的末尾结束。扫描器可以配置为仅读取所需的确切列。回想一下关系型数据库连接器,只有需要的列会添加到生成的 SQL 查询中以推送到数据库中。

Accumulo 还有一个 BatchScanner 对象的概念。在跨多个范围从 Accumulo 读取时使用它。这更有效,因为它能够使用多个工作节点与 Accumulo 通信,如图 7-4 所示。 用户首先将查询提交给协调器,协调器与 Accumulo 通信以确定来自元数据的拆分。它通过查找 Accumulo 中的可用索引中的范围来确定拆分。Accumulo 返回索引中的行 ID,Trino 将这些范围存储在拆分中。如果无法使用索引,一个拆分将用于单个分片中的所有范围。最后,工作节点使用这些信息连接到特定的分片服务器,并从 Accumulo 中并行提取数据。这通过使用 Accumulo 的 BatchScanner 工具从数据库中提取数据。

图片

一旦数据从工作节点中拉回,数据就会被放入 Trino 能理解的关系格式中,并且 Trino 会完成余下的处理。在这种情况下,Accumulo 用于数据存储,Trino 提供了对 Accumulo 中的数据的高级 SQL 接口。

如果您自己编写应用程序来从 Accumulo 检索数据,您可能会编写类似以下 Java 片段。您设置要扫描的范围,并定义要获取的列:

ArrayList<Range> ranges = new ArrayList<Range>();
ranges.add(new Range("1234"));
ranges.add(new Range("5678"));

BatchScanner scanner = client.createBatchScanner("flights", auths, 10);
scanner.setRanges(ranges);
scanner.fetchColumn("flightdate");
scanner.fetchColumn("flightnum");
scanner.fetchColumn("origin");

for (Entry<Key,Value> entry : scanner) {
  // populate into Trino format
}

不需要读取的列的修剪概念与关系型数据库连接器相似。Accumulo 连接器不使用 SQL 下推,而是使用 Accumulo API 设置要获取哪些列。

使用 Trino Accumulo 连接器

要使用 Accumulo,请创建一个目录属性文件(例如,etc/catalog/accumulo.properties),该文件引用 Accumulo 连接器并配置 Accumulo 访问,包括与 ZooKeeper 的连接:

connector.name=accumulo
accumulo.instance=accumulo
accumulo.zookeepers=zookeeper.example.com:2181
accumulo.username=user
accumulo.password=password

使用之前的航班示例,让我们使用 Trino 在 Accumulo 中创建一个表,可以使用 Trino CLI 或通过 JDBC 连接到 Trino 的 RDBMS 管理工具:

CREATE TABLE accumulo.ontime.flights (
    rowid VARCHAR,
    flightdate VARCHAR,
    flightnum INTEGER,
    origin VARCHAR,
    dest VARCHAR
);

在 Trino 中创建此表时,连接器实际上会在 Accumulo 中创建一个表,并在 ZooKeeper 中创建关于表的元数据。

还可以创建列族。在 Accumulo 中,列族是一种用于访问一起访问的列的应用程序的优化器。通过定义列族,Accumulo 安排如何存储在磁盘上的列,以便作为列族的一部分频繁访问的列被一起存储。如果要使用列族创建表,可以将其指定为 WITH 语句中的表属性:

CREATE TABLE accumulo.ontime.flights (
    rowid VARCHAR,
    flightdate VARCHAR,
    flightnum INTEGER,
    origin VARCHAR,
    dest VARCHAR
)
WITH
   column_mapping = 'origin:location:origin,dest:location:dest';

通过使用 column_mapping,您可以定义一个包含列修饰符 origin 和 dest 的列族位置,这与 Trino 列名称相同。

Trino Accumulo 连接器支持 INSERT 语句:

INSERT INTO accumulo.ontime.flights VALUES
    (2232, '2019-10-19', 118, 'JFK', 'SFO');

这是插入数据的一种方便方式。然而,当从 Trino 向 Accumulo 写入数据时,当前的吞吐量较低。为了获得更好的性能,您需要使用本机的 Accumulo API。Accumulo 连接器在 Trino 之外提供了用于插入数据的高性能辅助工具。您可以在 Trino 文档的独立工具中找到有关加载数据的更多信息。

我们在前面的示例中创建的表是内部表。Trino Accumulo 连接器支持内部表和外部表。这两种类型之间唯一的区别是删除外部表仅删除元数据而不删除数据本身。外部表允许您在 Accumulo 中创建已经存在的 Trino 表。此外,如果需要更改模式,比如添加列,您可以简单地在 Trino 中删除表并重新创建它,而不会丢失数据。值得注意的是,当每行不需要具有相同的列集时,Accumulo 可以支持此模式演变。

使用外部表需要更多的工作,因为数据已经以特定的方式存储。例如,在使用外部表时,必须使用 column_mapping 表属性。在创建表时,必须将 external 属性设置为 true:

CREATE TABLE accumulo.ontime.flights (
    rowid VARCHAR,
    flightdate VARCHAR,
    flightnum INTEGER,
    origin VARCHAR,
    dest VARCHAR
)
WITH
    external = true,
    column_mapping = 'origin:location:origin,dest:location:dest';

在 Accumulo 中的谓词下推

在 Accumulo 连接器中,Trino 可以利用 Accumulo 中内建的二级索引。为实现这一点,Accumulo 连接器需要在每个 Accumulo tablet 服务器上安装一个自定义的服务端迭代器。该迭代器以 JAR 文件的形式分发,您需要将其复制到每个 tablet 服务器上的$ACCUMULO_HOME/lib/ext 目录中。您可以在 Trino 文档中找到关于如何执行此操作的详细信息。

在 Accumulo 中,索引用于查找行 ID,然后可以使用这些 ID 从实际表中读取值。让我们看一个例子:

SELECT flightnum, origin
FROM flights
WHERE flightdate BETWEEN DATE '2019-10-01' AND '2019-11-05'
AND origin = 'BOS';

如果没有索引,Trino 将从 Accumulo 中读取整个数据集,然后在 Trino 内部进行过滤。工作节点获取包含要读取的 Accumulo 范围的切片。这个范围是整个 tablet 的范围。有了索引,比如表 7-3 中的示例索引,可以显著减少需要处理的范围数量。

图片

协调器使用 WHERE 子句和过滤器 flightdate BETWEEN DATE '2019-10-01' AND '2019-11-05' AND origin = 'BOS'扫描索引,以获取表的行 ID。然后,这些行 ID 被打包到工作节点稍后用于访问 Accumulo 中的数据的切片中。在我们的例子中,我们在 flightdate 和 origin 上有二级索引,我们收集了行 ID {2232, 1234, 5478} 和 {3498, 1234, 5678}。我们对每个索引取交集,知道我们只需扫描行 ID {1234, 5678}。然后,这个范围被放入切片中,由工作节点进行处理,它可以直接访问个别的值,就像在表 7-4 的数据详细视图中所示。

图片

为了充分利用谓词推送,我们需要在要推送谓词的列上建立索引。通过 Trino 连接器,可以通过 index_columns 表属性轻松启用对列的索引:

CREATE TABLE accumulo.ontime.flights (
    rowid VARCHAR,
    flightdate VARCHAR,
    flightnum INTEGER,
    origin VARCHAR,
    dest VARCHAR
)
WITH
    index_columns = 'flightdate,origin';

在这个关于 Apache Accumulo 的部分,您了解了键值存储以及 Trino 如何使用标准 SQL 查询它。让我们看看另一个更为广泛的系统,也可以从 Trino 中受益:Apache Cassandra。

Apache Cassandra 连接器

Apache Cassandra 是一种分布式的、支持大规模数据的宽列存储系统。其容错架构和线性可扩展性导致了 Cassandra 的广泛应用。

在 Cassandra 中处理数据的典型方法是使用专为 Cassandra 创建的自定义查询语言:Cassandra 查询语言(CQL)。虽然 CQL 在表面上看起来很像 SQL,但实际上它缺少 SQL 许多有用的功能,比如联接。总体而言,它与 SQL 有足够的不同,使得使用依赖于 SQL 的标准工具变得不可能。

然而,通过使用 Cassandra 连接器,您可以允许对 Cassandra 中的数据进行 SQL 查询。最小的配置是一个简单的目录文件,例如对于跟踪网站上所有用户交互的 Cassandra 集群,配置文件可以是 etc/catalog/sitedata.properties,内容如下:

connector.name=cassandra
cassandra.contact-points=sitedata.example.com

有了这个简单的配置,用户就可以查询 Cassandra 中的数据了。Cassandra 中的任何 keyspace(例如,cart)在 Trino 中都被公开为一个模式,现在可以使用正常的 SQL 查询诸如 users 之类的表:

SELECT * FROM sitedata.cart.users;

该连接器支持许多配置属性,允许您根据 Cassandra 集群自定义目录,为连接启用身份验证和 TLS,等等。

流处理系统连接器示例:Kafka

流处理系统和发布-订阅(pub/sub)系统旨在处理实时数据流。例如,Apache Kafka 被设计为 LinkedIn 的高吞吐量和低延迟平台。发布者将消息写入 Kafka,供订阅者消费。这样的系统通常用于系统之间的数据流水线。通常情况下,Trino Kafka 连接器用于从 Kafka 读取数据,但您也可以使用连接器发布数据。

使用连接器,您可以使用 SQL 查询 Kafka 主题上的数据,甚至与其他数据进行连接。在 Trino 中,典型用例是对实时 Kafka 主题流进行临时查询,以检查和更好地了解当前系统中流动的状态和数据。使用 Trino 使这对于数据分析师和其他通常不具有任何特定 Kafka 知识但了解如何编写 SQL 查询的用户变得更加容易和可访问。

Trino 与 Kafka 的另一个不太常见的用例是从 Kafka 迁移数据。使用 CREATE TABLE AS 或 INSERT SELECT 语句,您可以从 Kafka 主题读取数据,使用 SQL 转换数据,然后将其写入 HDFS、S3 或其他存储。

由于 Kafka 是一个流处理系统,暴露的主题会随着新数据的到来而不断变化。在使用 Trino 查询 Kafka 主题时必须考虑这一点。使用 Trino 将数据迁移到 HDFS 或具有永久存储的另一个数据库系统允许保留通过 Kafka 主题传递的信息。

一旦数据在目标数据库或存储中永久可用,Trino 可以用于将其暴露给诸如 Apache Superset 之类的分析工具;参见“使用 Apache Superset 进行查询、可视化等”。

使用 Kafka 连接器的方式与任何其他连接器一样。创建一个目录(例如,etc/catalog/trafficstream.properties),使用 Kafka 连接器,配置任何其他必需的细节,并指向您的 Kafka 集群:

connector.name=kafka
kafka.table-names=web.pages,web.users
kafka.nodes=trafficstream.example.com:9092

现在,来自 Kafka web.pages 和 web.users 的每个主题都作为 Trino 中的表可用。随时,该表会暴露包含主题中所有当前消息的整个 Kafka 主题。在 Trino 上,使用目录、模式和表名称,现在可以轻松通过 SQL 查询数据:

SELECT * FROM trafficstream.web.pages;
SELECT * FROM trafficstream.web.users;

基本上,您可以使用简单的 SQL 查询实时检查 Kafka 主题。

如果要迁移数据到另一个系统,比如 HDFS 目录,您可以从一个简单的 CREATE TABLE AS(CTAS)查询开始:

CREATE TABLE hdfs.web.pages
WITH (
   format = 'ORC',
   partitioned_by = ARRAY['view_date']
)
AS
SELECT *
FROM trafficstream.web.pages;

一旦表存在,您可以通过定期运行插入查询来将更多数据插入其中:

INSERT INTO hdfs.web.pages
SELECT *
FROM trafficstream.web.pages;

为了避免重复复制,您可以跟踪连接器暴露的 Kafka 的一些内部列。具体来说,您可以使用_partition_id、_partition_offset、_segment_start、_segment_end 和_segment_count。您定期运行查询的具体设置取决于用于删除消息的 Kafka 配置以及用于运行查询的工具,例如在“使用 Apache Airflow 进行工作流处理”中描述的 Apache Airflow。

将 Kafka 主题(作为表暴露)及其包含的消息的映射可以在 etc/kafka/schema.tablename.json 中的每个主题的 JSON 文件中定义。对于前面的例子,您可以在 etc/kafka/web.pages.json 中定义映射。

Kafka 消息可以使用不同的格式,Kafka 连接器包括最常见格式的解码器,包括 Raw、JSON、CSV 和 Avro。

有关配置属性、映射和其他内部列的详细信息,请参阅 Trino 文档中的“文档”。

使用 Trino 与 Kafka 打开了对通过 Kafka 传递的数据进行新的分析和洞察,并定义了 Trino 的另一个有价值的用途。Trino 还支持用于类似用途的另一个流处理系统,即 Amazon Kinesis。

文档存储连接器示例:Elasticsearch

Trino 包括连接器,用于一些知名的文档存储系统,例如 Elasticsearch 或 MongoDB。这些系统支持以类似 JSON 的文档形式存储和检索信息。Elasticsearch 更适用于索引和搜索文档,而 MongoDB 是一个通用的文档存储系统。

概述

Trino 连接器允许用户使用 SQL 访问这些系统并查询其中的数据,尽管这些系统没有原生的 SQL 访问方式。

Elasticsearch 集群通常用于存储日志数据或其他事件流,以供长期甚至永久存储。这些数据集通常非常庞大,它们可以是更好地了解发出操作日志数据的系统以及在各种场景中使用的有用资源。

Elasticsearch 和 Trino 是强大而高效的组合,因为两个系统都可以水平扩展。Trino 通过将查询拆分并在集群中的许多工作节点上运行其部分来进行扩展。

Elasticsearch 通常在自己的集群上运行,并且也可以水平扩展。它可以在许多节点上分片索引并以分布方式运行任何搜索操作。调整 Elasticsearch 集群以提高性能是一个独立的主题,需要理解搜索索引中文档的数量、集群中节点的数量、副本集、分片配置以及其他细节。

然而,从客户端的角度来看,因此也从 Trino 的角度来看,这一切都是透明的,Elasticsearch 只需使用 Elasticsearch 服务器的 URL 公开集群。

配置和使用

配置 Trino 以访问 Elasticsearch 是通过创建一个目录文件,例如 etc/catalog/search.properties 来完成的:

connector.name=elasticsearch
elasticsearch.host=searchcluster.example.com

此配置依赖于端口、模式和其他详细信息的默认值,但对于查询集群已经足够。连接器支持 Elasticsearch 的多种数据类型。它会自动分析每个索引,将每个索引配置为表,将表公开在默认模式中,创建必要的嵌套结构和行类型,并在 Trino 中公开所有这些。索引中的任何文档都会自动解压缩为 Trino 中的表结构。例如,名为 server 的索引会自动在目录的默认模式中作为表可用,您可以查询 Trino 以获取有关该结构的更多信息:

DESCRIBE search.default.server;

用户可以立即开始查询该索引。信息模式或 DESCRIBE 命令可用于了解为每个索引/模式创建的表和字段。

Elasticsearch 模式中的字段通常包含作为数组的多个值。如果自动检测不如预期,可以在索引映射的字段属性定义中添加字段属性。此外,_source 隐藏字段包含来自 Elasticsearch 的源文档,如果需要,可以使用 JSON 文档解析函数(参见“JSON 函数”)以及集合数据类型(参见“集合数据类型”)。这些通常在处理 Elasticsearch 集群中的文档(主要是 JSON 文档)时非常有用。

在 Elasticsearch 中,可以将一个或多个索引的数据公开为别名。这也可以是经过过滤的数据。Trino 连接器支持别名的使用,并将它们像任何其他索引一样公开为表。

查询处理

一旦你从 Trino 向 Elasticsearch 发出查询,Trino 利用其集群基础设施,除了已经存在的 Elasticsearch 集群,以进一步提高性能。

Trino 查询 Elasticsearch 以了解所有的 Elasticsearch 分片。然后在创建查询计划时使用这些信息。它将查询拆分成针对特定分片的单独切片,然后并行向所有分片发出单独的查询。一旦结果返回,它们在 Trino 中合并并返回给用户。这意味着 Trino 与 Elasticsearch 结合使用,可以使用 SQL 进行查询,并且比单独使用 Elasticsearch 更高效。

还要注意,在典型的 Elasticsearch 集群中,这种对特定分片的个别连接也会发生,其中集群在负载均衡器后运行,并通过 DNS 主机名进行公开。

全文检索

Elasticsearch 连接器强大的功能之一是对全文搜索的支持。它允许您在从 Trino 发出的 SQL 查询中使用 Elasticsearch 查询字符串。

例如,想象一下一个包含网站上的博客文章的索引。这些文档存储在 blogs 索引中。也许这些帖子包含许多字段,如标题、简介、文章、摘要和作者。通过全文搜索,您可以编写一个简单的查询,搜索所有字段中的整个内容,以查找特定术语,如 trino:

SELECT * FROM "blogs: +trino";

Elasticsearch 中的查询字符串语法支持加权不同的搜索术语和其他适用于全文搜索的功能。

总结

该连接器还支持 Amazon 的 Elasticsearch 开源版本 OpenSearch。使用 Amazon OpenSearch 服务的用户可以利用对 AWS Identity and Access Management 的支持。有关此配置以及如何使用 TLS 保护与 Elasticsearch 集群的连接等更多详细信息,请参阅 Trino 文档。

使用 Trino 与 Elasticsearch,您可以使用围绕 SQL 支持的强大工具分析索引中的丰富数据。您可以手动编写查询或连接丰富的分析工具。这使您能够比以前更好地了解集群中的数据。

连接 MongoDB 到 Trino 时,也可以利用 Trino MongoDB 连接器获得类似的优势。

Trino 的联邦查询

在阅读了有关 Trino 在“Trino 用例”中的所有用例,并了解了 Trino 中所有数据源和可用连接器的情况之后,您现在已经准备好深入了解 Trino 中的查询联邦。联邦查询是一种访问多个数据源中的数据的查询。这种查询可以用于将来自多个关系型数据库(例如在 PostgreSQL 上运行的企业后端应用程序数据库与在 MySQL 上运行的 Web 应用程序数据库)的内容和信息进行关联。它还可以是在 PostgreSQL 上运行的数据仓库,通过来自源头也在 PostgreSQL 或其他地方运行的数据进行查询。

然而,当您将关系型数据库的查询与针对其他非关系型系统的查询结合使用时,更强大的示例就会出现。将数据仓库的数据与对象存储中的信息相结合,其中包含来自您的 Web 应用程序的大规模数据。或者将数据关联到键值存储或 NoSQL 数据库中的内容。您的对象存储数据湖,甚至您的现代数据湖屋,可以突然通过 SQL 公开,并且这些信息可以成为更好地理解整体数据的基础。

查询联邦可以帮助您真正理解不同系统中所有数据之间的连接和依赖关系,从而更好地洞察整体局势。

在接下来的示例中,您将了解在分布式存储中联接数据与关系型数据库中的数据的用例。您可以在“Flight Data Set”中找到有关必要设置的信息。

利用这些数据,您可以通过使用 SQL 查询提出诸如“每年飞机的平均延误是多少?”这样的问题:

SELECT avg(depdelayminutes) AS delay, year
FROM flights_orc
GROUP BY year
ORDER BY year DESC;

另一个问题是“二月份从波士顿出发的最佳星期几?”:

SELECT dayofweek, avg(depdelayminutes) AS delay
FROM flights_orc
WHERE month=2 AND origincityname LIKE '%Boston%'
GROUP BY dayofmonth
ORDER BY dayofweek;

由于多个数据源和查询联邦的概念是 Trino 的一个组成部分,我们鼓励您设置一个环境并探索数据。这些查询可以激发您创建自己的附加查询的灵感。

我们使用航空数据的两个示例分析查询来演示 Trino 中的查询联邦。我们提供的设置使用存储在 S3 中并通过配置 Hive 连接器访问的数据。但是,如果您愿意,您可以将数据存储在 HDFS、Azure 存储或 Google Cloud 存储中,并使用 Hive 连接器查询数据。

在第一个示例查询中,我们希望 Trino 返回 HDFS 中数据中航空公司飞行最多的前 10 家航空公司:

SELECT uniquecarrier, count(*) AS ct
FROM flights_orc
GROUP BY uniquecarrier
ORDER BY count(*) DESC
LIMIT 10;
 uniquecarrier |    ct
---------------+----------
 WN            | 24096231
 DL            | 21598986
 AA            | 18942178
 US            | 16735486
 UA            | 16377453
 NW            | 10585760
 CO            |  8888536
 OO            |  7270911
 MQ            |  6877396
 EV            |  5391487
(10 rows)

尽管前面的查询提供了飞行最多的前 10 家航空公司的结果,但是它要求您理解 uniquecarrier 的值。如果一个更具描述性的列提供了航空公司名称的全称而不是缩写,那将会更好。然而,我们从查询的航空数据源中获取的数据不包含这样的信息。也许如果另一个包含此信息的数据源存在,我们可以将数据源组合起来以返回更易理解的结果。

让我们看另一个例子。在这里,我们希望 Trino 返回飞行最多的前 10 个机场:

SELECT origin, count(*) AS ct
FROM flights_orc
GROUP BY origin
ORDER BY count(*) DESC
LIMIT 10;
  origin |  ct
--------+---------
 ATL    | 8867847
 ORD    | 8756942
 DFW    | 7601863
 LAX    | 5575119
 DEN    | 4936651
 PHX    | 4725124
 IAH    | 4118279
 DTW    | 3862377
 SFO    | 3825008
 LAS    | 3640747
(10 rows)

与前面的查询一样,结果需要一些领域专业知识。例如,您需要理解 origin 列包含机场代码。对于对结果进行分析的专业知识较少的人来说,这个代码是没有意义的。

让我们通过将结果与关系型数据库中的附加数据结合起来,增强我们的结果。在我们的示例中,我们使用 PostgreSQL,但对于任何关系型数据库,类似的步骤也是适用的。

与航空数据一样,我们的 GitHub 仓库包含在关系型数据库中创建和加载表以及配置 Trino 连接器以访问它的设置。我们选择配置 Trino 从包含附加航空公司数据的 PostgreSQL 数据库查询。PostgreSQL 中的 carrier 表提供了航空公司代码到更具描述性的航空公司名称的映射。您可以将此附加数据与我们的第一个示例查询一起使用。

让我们看一下 PostgreSQL 中的 carrier 表:

SELECT * FROM carrier LIMIT 10;
 code |                 description
------+----------------------------------------------
 02Q  | Titan Airways
 04Q  | Tradewind Aviation
 05Q  | Comlux Aviation, AG
 06Q  | Master Top Linhas Aereas Ltd.
 07Q  | Flair Airlines Ltd.
 09Q  | Swift Air, LLC
 0BQ  | DCA
 0CQ  | ACM AIR CHARTER GmbH
 0GQ  | Inter Island Airways, d/b/a Inter Island Air
 0HQ  | Polar Airlines de Mexico d/b/a Nova Air
(10 rows)

该表包含 code 列和 description 列。利用这些信息,我们可以使用我们的第一个示例查询来连接来自 PostgreSQL carrier 表的数据和 flights_orc 表的数据:

SELECT f.uniquecarrier, c.description, count(*) AS ct
FROM datalake.ontime.flights_orc f,
    postgresql.airline.carrier c
WHERE c.code = f.uniquecarrier
GROUP BY f.uniquecarrier, c.description
ORDER BY count(*) DESC
LIMIT 10;
 uniquecarrier |        description         |    ct
---------------+----------------------------+----------
 WN            | Southwest Airlines Co.     | 24096231
 DL            | Delta Air Lines Inc.       | 21598986
 AA            | American Airlines Inc.     | 18942178
 US            | US Airways Inc.            | 16735486
 UA            | United Air Lines Inc.      | 16377453
 NW            | Northwest Airlines Inc.    | 10585760
 CO            | Continental Air Lines Inc. |  8888536
 OO            | SkyWest Airlines Inc.      |  7270911
 MQ            | Envoy Air                  |  6877396
 EV            | ExpressJet Airlines Inc.   |  5391487
(10 rows)

现在,我们已经编写了一条单一的 SQL 查询,从 S3 和 PostgreSQL 中联合数据,我们能够提供更有价值的结果以提取含义。与其必须知道或分开查找航空公司代码不同,结果中包含了描述性的航空公司名称。

在查询中,当引用表时,必须使用完全限定的名称。当使用 USE 命令设置默认的目录和架构时,非限定的表名称与该目录和架构相链接。但是,每当需要查询超出目录和架构范围的内容时,表名称必须合格。否则,Trino 尝试在默认目录和架构中查找它,并返回错误。如果您引用的是默认目录和架构中的表,则无需完全合格化表名。但是,每当引用默认范围之外的数据源时,最好作为最佳实践进行完全合格化。

接下来,让我们看看 PostgreSQL 中的 airport 表。这个表是我们联邦化第二个示例查询的一部分:

SELECT code, name, city
FROM airport
LIMIT 10;

sql
复制代码
 code |           name           |         city
------+--------------------------+----------------------
 01A  | Afognak Lake Airport     | Afognak Lake, AK
 03A  | Bear Creek Mining Strip  | Granite Mountain, AK
 04A  | Lik Mining Camp          | Lik, AK
 05A  | Little Squaw Airport     | Little Squaw, AK
 06A  | Kizhuyak Bay             | Kizhuyak, AK
 07A  | Klawock Seaplane Base    | Klawock, AK
 08A  | Elizabeth Island Airport | Elizabeth Island, AK
 09A  | Augustin Island          | Homer, AK
 1B1  | Columbia County          | Hudson, NY
 1G4  | Grand Canyon West        | Peach Springs, AZ
(10 rows)

从 PostgreSQL 的这些数据中,你可以看到 code 列可以与我们在 flight_orc 表上的第二个查询进行连接。这允许你在查询中使用 airport 表中的附加信息以提供更多细节:

SELECT f.origin, c.name, c.city, count(*) AS ct
FROM hive.ontime.flights_orc f,
    postgresql.airline.airport c
WHERE c.code = f.origin
GROUP BY origin, c.name, c.city
ORDER BY count(*) DESC
LIMIT 10;
 origin |                   name                   |         city          |   ct
--------+------------------------------------------+-----------------------+---------
 ATL    | Hartsfield-Jackson Atlanta International | Atlanta, GA           | 8867847
 ORD    | Chicago OHare International              | Chicago, IL           | 8756942
 DFW    | Dallas/Fort Worth International          | Dallas/Fort Worth, TX | 7601863
 LAX    | Los Angeles International                | Los Angeles, CA       | 5575119
 DEN    | Denver International                     | Denver, CO            | 4936651
 PHX    | Phoenix Sky Harbor International         | Phoenix, AZ           | 4725124
 IAH    | George Bush Intercontinental/Houston     | Houston, TX           | 4118279
 DTW    | Detroit Metro Wayne County               | Detroit, MI           | 3862377
 SFO    | San Francisco International              | San Francisco, CA     | 3825008
 LAS    | McCarran International                   | Las Vegas, NV         | 3640747
(10 rows)

与我们的第一个示例一样,通过在两个不同的数据源之间进行联邦,我们可以提供更有意义的信息。在这里,我们能够添加机场的名称,而不是让用户依赖难以解释的机场代码。

通过这个查询联邦的快速示例,你可以看到不同数据源的组合以及在 Trino 中的中央查询可以极大地改进查询结果。我们的例子仅增强了结果的外观和可读性。然而,在许多情况下,使用更丰富、更大的数据集,查询的联邦以及来自不同来源的数据的组合可能会导致对数据完全新的理解。

现在我们已经从最终用户的角度通过了一些查询联邦的示例,让我们讨论一下这是如何工作的架构。我们在 Trino 架构的第 4 章学到的一些概念的基础上构建。

Trino 能够协调查询在涉及的数据源之间的混合执行。在之前的示例中,我们在分布式存储和 PostgreSQL 之间进行查询。对于通过 Hive 连接器进行的分布式存储,Trino 直接读取数据文件,无论它们来自 HDFS、S3、Azure Blob Storage 等。对于 PostgreSQL 连接器这样的关系数据库连接器,Trino 依赖于 PostgreSQL 作为执行的一部分。让我们使用之前的查询,但为了使其更有趣,我们添加一个引用 PostgreSQL airport 表中列的新谓词:

SELECT f.origin, c.name, c.city, count(*) AS ct
FROM datalake.ontime.flights_orc f,
  postgresql.airline.airport c
WHERE c.code = f.origin AND c.state = 'AK'
GROUP BY origin, c.name, c.city
ORDER BY count(*) DESC
LIMIT 10;

逻辑查询计划类似于图 7-5。你可以看到计划包括扫描 flights_orc 和 airport 表。两个输入都被馈送到连接运算符。但在机场数据馈送到连接之前,应用了一个过滤器,因为我们只想查看阿拉斯加州的机场的结果。连接之后,应用了聚合和分组操作。最后,TopN 运算符执行 ORDER BY 和 LIMIT 的组合。

图片

为了从 PostgreSQL 中检索数据,Trino 通过 JDBC 发送查询。例如,在朴素的方法中,以下查询被发送到 PostgreSQL:

SELECT * FROM airline.airport;

然而,Trino 比这更聪明,Trino 优化器试图减少在系统之间传输的数据量。在这个例子中,Trino 仅查询它从 PostgreSQL 表中需要的列,并将谓词推送到发送到 PostgreSQL 的 SQL 中。

因此,现在从 Trino 发送到 PostgreSQL 的查询将更多的处理推送到 PostgreSQL:

SELECT code, city, name FROM airline.airport WHERE state = 'AK';

作为 JDBC 连接器返回数据到 Trino,Trino 继续处理在 Trino 查询引擎中执行的部分的数据。一些简单的查询,比如 SELECT * FROM public.airport,完全被推送到底层数据源,如图 7-6 所示,使得查询执行在 Trino 之外,Trino 充当一个透传的角色。

对于更复杂的 SQL 推送,取决于连接器。例如,只涉及关系数据库管理系统(RDBMS)数据的连接器可以将涉及的连接推送到 PostgreSQL 以消除数据传输到 Trino。许多聚合函数也可以由 PostgreSQL 连接器推送下去,而其他连接器未必能够执行此操作。

图片

提取、转换、加载和联邦查询

提取、转换、加载(ETL)是用来描述将数据从数据源复制并导入另一个数据源的技术术语。通常在将数据着陆到目标之前,还有一个中间步骤来对来自源的数据进行转换。这可能包括删除列、进行计算、过滤和清理数据、合并数据、执行预聚合等操作,以准备数据并使其适用于在目标中查询。在某些用例中,这些操作的顺序会改变,这个过程被称为提取、加载和转换(ELT)。虽然存在一些关键的差异,Trino 适用于这两种方法。

Trino 并不打算成为一个与商业解决方案相媲美的完整的 ETL 工具。然而,它可以通过避免需要 ETL 来提供帮助。由于 Trino 可以从数据源查询,可能就不再需要移动数据了。Trino 在数据所在的位置查询数据,从而减轻了管理 ETL 过程的复杂性。

您仍然可能希望执行某种类型的 ETL 过程。也许您想在预聚合的数据上进行查询,或者您不想给底层系统增加更多负载。通过使用 CREATE TABLE AS 或 INSERT SELECT 结构,您可以将数据从一个数据源移动到另一个数据源。

Trino 可以在这些和其他场景中提供帮助,并作为由其他工具编排的 ETL 工作负载的查询处理和执行引擎。

在使用 Trino 进行 ETL 工作负载和用例时的一个巨大优势是它支持超出关系数据库之外的其他数据源。对查询的容错执行的新支持使 Trino 在与现代数据湖仓库设置结合使用的 ETL 用例中成为一流的查询引擎。结合 Trino 相对于许多其他 ETL 解决方案的性能优势以及现代数据流水线工具(如 dbt、Apache Flink 和 Apache Airflow)的集成,Trino 正在超越纯分析用例,成为现代数据湖仓库和数据平台架构的重要组成部分。

总结

你现在对 Trino 中的连接器有了很好的理解。是时候充分利用它们了。配置你的目录,准备好深入了解有关查询数据源的更多信息。

这将我们带到了下一个主题,即 Trino 中 SQL 的方方面面。SQL 知识对于成功使用 Trino 至关重要,我们将在第 8 章和第 9 章中涵盖你需要了解的一切。

发表回复