《Trino权威指南2》第六章:连接器

前文引用:
《Trino权威指南2》第一章:Trino介绍
《Trino权威指南2》第二章:安装和配置Trino
《Trino权威指南2》第三章:使用Trino
《Trino权威指南2》第四章:Trino的架构
《Trino权威指南2》第五章:生产就绪部署

在第 3 章中,您配置了一个目录,使用连接器访问 Trino 中的数据源,具体而言,是 TPC-H 基准数据,并学习了如何使用 SQL 查询该数据的一些知识。

目录是使用 Trino 的重要方面。它们定义了与底层数据源和存储系统的连接,并使用连接器、模式和表等概念。这些基本概念在第 4 章中进行了描述,并且在第 8 章中更详细地讨论了它们在 SQL 中的使用。

连接器将底层数据源(如关系型数据库管理系统、对象存储或键值存储)的查询和存储概念转换为 Trino 的表、列、行和数据类型的 SQL 和 Trino 概念。这些转换可以是简单的 SQL 到 SQL 的翻译和映射,也可以是从 SQL 到对象存储或 NoSQL 系统的更复杂的转换。这些还可以是用户定义的。

您可以将连接器视为数据库驱动程序的方式。它将用户输入转换为底层数据库可以执行的操作。每个连接器都实现了与连接器相关的 Trino SPI 方面。这使得 Trino 能够让您使用相同的 SQL 工具来处理连接器公开的任何底层数据源,并使 Trino 成为一个适用于任何数据源的 SQL 系统。

连接器的实现也会影响查询性能。最基本的连接器建立一个到数据源的单个连接,并向 Trino 提供数据。然而,更高级的连接器可以将语句拆分为多个连接,以并行执行操作,从而实现更好的性能。连接器的另一个高级特性是提供表统计信息,这些统计信息可以由基于成本的优化器用于创建高性能的查询计划。然而,这样的连接器实现更加复杂。

Trino 提供了许多连接器:

  • 针对关系型数据库管理系统(如 PostgreSQL 或 MySQL)的连接器,参见《RDBMS 连接器示例:PostgreSQL》。
  • 适用于使用 HDFS 和类似对象存储系统进行查询的 Hive 连接器,参见《用于分布式存储数据源的 Hive 连接器》。
  • 用于湖仓系统中的现代对象存储系统和表格式的连接器,参见《现代分布式存储管理和分析》。
  • 针对非关系型数据源的多个连接器,参见《非关系型数据源》。
  • tpch 和 tpcds 连接器专为提供 TPC 基准数据而设计,参见《Trino TPC-H 和 TPC-DS 连接器》。
  • 用于 Java 管理扩展(JMX)的连接器,参见《Trino JMX 连接器》。

在本章中,您将更多了解这些连接器,这些连接器可以从 Trino 项目中获得。目前,Trino 中提供了超过二十多个连接器,Trino 团队和用户社区还会创建更多连接器。还可以使用商业专有连接器来扩展 Trino 的功能和性能。最后,如果您有自定义数据源,或者没有合适的连接器可用,您可以通过实现必要的 SPI 调用来自定义自己的连接器,并将其放置在 Trino 的插件目录中。

目录和连接器使用的一个重要方面是它们同时适用于 Trino 中的 SQL 语句和查询。这意味着您可以创建跨数据源的查询。例如,您可以将关系数据库中的数据与存储在对象存储后端的文件数据组合起来。这些联合查询将在《Trino 中的查询联邦》中更详细地讨论。

配置

正如在《添加数据源》中所讨论的那样,您想要访问的每个数据源都需要通过创建目录属性文件来配置为目录。文件的名称决定了在编写查询时的目录名称。

必需的属性 connector.name 指示用于该目录的连接器。同一个连接器可以在不同的目录中多次使用,例如,使用相同的技术(如 PostgreSQL)访问不同的关系数据库服务器实例和不同的数据库。或者,如果您有两个 Hive 集群,您可以在单个 Trino 集群中配置两个使用 Hive 连接器的目录,从而可以查询来自两个 Hive 集群的数据。

关系型数据库管理系统(RDBMS)连接器示例:PostgreSQL

Trino 包含对开源和专有的关系型数据库管理系统(RDBMS)的连接器,包括 MySQL、PostgreSQL、Oracle、Amazon Redshift、Microsoft SQL Server 等。Trino 使用这些连接器通过各自的 JDBC 驱动程序查询这些数据源。

让我们以使用 PostgreSQL 的简单示例为例。一个 PostgreSQL 实例可能包含多个数据库。每个数据库包含模式,模式中包含表和视图等对象。在配置 Trino 与 PostgreSQL 的连接时,您选择将哪个数据库暴露为 Trino 中的目录。

在创建一个简单的目录文件,指向服务器上的特定数据库,例如 etc/catalog/postgresql.properties,并重新启动 Trino 之后,您可以获得更多信息。您还可以看到 postgresql 连接器是如何配置所需的 connector.name 的:

connector.name=postgresql
connection-url=jdbc:postgresql://db.example.com:5432/database
connection-user=root
connection-password=secret

您可以列出所有目录以确认新的目录是否可用,并使用 Trino CLI 或使用 JDBC 驱动程序的数据库管理工具(如《Trino 命令行界面》和《Trino JDBC 驱动程序》中所述)来检查详细信息。

SHOW CATALOGS;
  Catalog
------------
 system
 postgresql
(2 rows)

SHOW SCHEMAS IN postgresql;
  Catalog
------------
 public
 airline
(2 rows)

USE postgresql.airline
SHOW TABLES;
  Table
---------
 airport
 carrier
(2 rows)

在这个例子中,您可以看到我们连接到了一个包含两个模式(public 和 airline)的 PostgreSQL 数据库。然后,在 airline 模式下有两个表:airport 和 carrier。让我们尝试运行一个查询。在这个例子中,我们向 Trino 发出一个 SQL 查询,其中表存在于一个 PostgreSQL 数据库中。使用 PostgreSQL 连接器,Trino 能够检索数据进行处理,并将结果返回给用户。

SELECT code, name FROM airport WHERE code = 'ORD';
 code |             name
------+------------------------------
 ORD  | Chicago OHare International
(1 row)

如图 6-1 所示,客户端将查询提交给 Trino 协调器。协调器将工作转移到一个工作节点,该节点使用 JDBC 将整个 SQL 查询语句发送到 PostgreSQL。PostgreSQL JDBC 驱动程序包含在 PostgreSQL 连接器中。PostgreSQL 处理查询并通过 JDBC 返回结果。连接器读取结果并将其写入 Trino 的内部数据格式。Trino 在工作节点上继续处理,将结果提供给协调器,然后将结果返回给用户。

图片

查询下推(Query Pushdown)

正如我们在前面的例子中看到的,Trino 能够通过将 SQL 语句下推到底层数据源中来进行处理。这被称为查询下推(query pushdown)或 SQL 下推。这是有优势的,因为底层系统可以减少返回给 Trino 的数据量,避免不必要的内存、CPU 和网络开销。此外,像 PostgreSQL 这样的系统通常在某些过滤列上有索引,可以加快处理速度。然而,并非总是能够将整个 SQL 语句下推到数据源中。目前,Trino Connector SPI 限制了可以下推到过滤和列投影的操作类型:

SELECT state, count(*)
FROM airport
WHERE country = 'US'
GROUP BY state;

根据前面的 Trino 查询,PostgreSQL 连接器构建将下推到 PostgreSQL 的 SQL 查询:

SELECT state
FROM airport
WHERE country = 'US';

当 RDBMS 连接器推送查询时,有两个重要的地方需要注意。SELECT 列表中的列被设置为 Trino 所需的内容。在本例中,我们只需要 state 列来处理 Trino 中的 GROUP BY。我们还将过滤条件 country = 'US'下推,这意味着我们不需要在 Trino 中对 country 列进行进一步处理。您会注意到聚合操作没有被下推到 PostgreSQL。这是因为 Trino 无法将其他形式的查询下推,而聚合操作必须在 Trino 中执行。这是有优势的,因为 Trino 是一个分布式查询处理引擎,而 PostgreSQL 不是。

如果您确实想将其他处理下推到底层的 RDBMS 源,可以通过使用视图来实现。如果您在 PostgreSQL 中将处理封装在一个视图中,它会作为一个表对 Trino 可见,并在 PostgreSQL 中进行处理。例如,假设您在 PostgreSQL 中创建了这个视图:

CREATE view airline.airports_per_us_state AS
SELECT state, count(*) AS count_star
FROM airline.airport
WHERE country = 'US'
GROUP BY state;

当您在 Trino 中运行 SHOW TABLES 命令时,您会看到这个视图:

SHOW TABLES IN postgresql.airline;
  Table
---------
 airport
 carrier
 airports_per_us_state
(3 rows)

现在您可以直接查询这个视图,所有的处理都会下推到 PostgreSQL,因为对 Trino 来说,这个视图看起来就像一个普通的表:

SELECT * FROM airports_per_us_state;

并行和并发

目前,所有的 RDBMS 连接器都使用 JDBC 与底层数据源建立单一连接。即使底层数据源是一个并行系统,数据也不会并行读取。对于像 Teradata 或 Vertica 这样的并行系统,您需要编写并行连接器,以利用这些系统以分布式方式存储数据的特点。

当从同一 RDBMS 访问多个表时,每个表在查询中都会创建和使用一个 JDBC 连接。例如,如果查询在 PostgreSQL 中执行两个表之间的连接,Trino 会通过 JDBC 创建两个不同的连接来检索数据,如图 6-2 所示。它们并行运行,发送结果,然后在 Trino 中执行连接操作。PostgreSQL 连接器支持连接推送,因此根据连接的具体情况和连接中使用的列的数据类型,连接操作可以被推送到数据库本身进行处理。

图片

如果您希望强制使用底层 PostgreSQL 系统中的查询处理,可以在 PostgreSQL 中创建一个视图,并甚至添加原生索引以进一步提高性能。

其他 RDBMS 连接器

Trino 开源项目包含其他 RDBMS 连接器,如 MySQL、MariaDB、Oracle、PostgreSQL、Amazon Redshift 和 Microsoft SQL Server。这些连接器已经包含在 Trino 的插件目录中,并且可以进行配置。如果您有多个服务器或想要分开访问,可以在 Trino 中为每个实例配置多个目录。您只需要以不同的名称命名*.properties 文件。通常情况下,属性文件的名称确定了目录的名称:

SHOW CATALOGS;
  Catalog
------------
 system
 mysql-dev
 mysql-prod
 mysql-site
(2 rows)

不同的 RDBMS 之间存在一些细微差别。让我们看看它们在目录配置文件中的配置方式。

在 MySQL 中,数据库和模式(schema)之间没有区别,目录文件和 JDBC 连接字符串基本上指向特定的 MySQL 服务器实例:

connector.name=mysql
connection-url=jdbc:mysql://example.net:3306
connection-user=root
connection-password=secret

PostgreSQL 进行了明确的区分,一个服务器实例可以包含多个包含模式的数据库。JDBC 连接指向特定的数据库:

connector.name=postgresql
connection-url=jdbc:postgresql://example.net:5432/database
connection-user=root
connection-password=secret

Amazon Redshift 的目录配置与 PostgreSQL 类似。实际上,Redshift 过去使用的是 PostgreSQL 的 JDBC 驱动,因为它基于开源的 PostgreSQL 代码:

connector.name=redshift
connection-url=jdbc:redshift://example.net:5439/database
connection-user=root
connection-password=secret

Microsoft SQL Server 的连接字符串与 MySQL 字符串类似。然而,SQL Server 确实有数据库和模式的概念,而这个示例只是连接到默认数据库:

connector.name=sqlserver
connection-url=jdbc:sqlserver://example.net:1433
connection-user=root
connection-password=secret

使用不同的数据库(如 sales)需要通过属性进行配置:

connection-url=jdbc:sqlserver://example.net:1433;databaseName=sales

安全

通过将用户名和密码存储在目录配置文件中,以最简单的方式对 RDBMS 连接器进行身份验证。由于 Trino 集群中的机器被设计为受信任的系统,这对于大多数用途来说应该足够了。为了保护 Trino 和连接的数据源的安全性,重要的是确保对机器和配置文件的访问安全。它应该被视为私钥一样对待。所有 Trino 的用户都使用相同的连接访问 RDBMS。

如果您不想以明文形式存储密码,有办法从 Trino 客户端传递用户名和密码。您还可以使用由供应系统注入到环境变量中的用户名和密码值,然后 Trino 可以获取这些密钥。我们在第 10 章中会进一步讨论这个问题。

总之,使用 Trino 与 RDBMS 非常简单,允许您将所有系统集中在一个地方,并同时查询它们。仅仅这个用例就已经为 Trino 用户带来了显着的好处。当然,当您添加更多的数据源和其他连接器时,情况会变得更加有趣。所以让我们继续学习更多内容。

查询透传(Query Pass-Through)

查询透传(Query Pass-Through)是一种在某些 RDBMS 连接器中可用的强大功能。它利用了 Trino 中多态表函数的支持。这些函数是返回表作为结果的 SQL 语句。在查询透传的情况下,这些表函数是 Trino 本身忽略并透传给底层数据源的查询语句。它解析和处理这个查询语句,并返回一个表作为结果。Trino 可以返回这个表,或者在 Trino 的 SQL 查询中进一步处理它。

实际上,这意味着你可以在 Trino 的 SQL 语句中嵌入本地查询语言代码(例如,Oracle 特定的 SQL 片段),该代码返回一个表。这使你可以访问底层数据源的功能,用于遗留系统迁移。它还可以解锁进一步的性能优化或查询方法,例如支持其他查询语言。

Trino TPC-H 和 TPC-DS 连接器

您已经在第 2 章中了解了 TPC-H 连接器的使用方式。现在我们来更详细地了解一下。

TPC-H 和 TPC-DS 连接器已经内置在 Trino 中,并提供了一组模式来支持 TPC Benchmark H(TPC-H)和 TPC Benchmark DS(TPC-DS)。这些来自事务处理性能委员会(TPC)的数据库基准套件是行业标准的数据库系统基准测试,用于评估高度复杂的决策支持数据库的性能。

这些连接器可以用于测试 Trino 的功能和查询语法,而无需配置对外部数据源的访问权限。当您查询一个 TPC-H 或 TPC-DS 模式时,连接器会使用确定性算法实时生成数据。

创建一个目录属性文件(例如,命名为 etc/catalog/tpch.properties)来使用 TPC-H 连接器:

connector.name=tpch

TPC-DS 连接器的配置方式类似,例如使用 etc/catalog/tpcdsdata.properties 文件:

connector.name=tpcds

这两个连接器都公开了在结构上相同的数据集的模式:

SHOW SCHEMAS FROM tpch;
       Schema
--------------------
 information_schema
 sf1
 sf100
 sf1000
 sf10000
 sf100000
 sf300
 sf3000
 sf30000
 tiny
(10 rows)

表 6-1 显示了不同的模式中的事务表(例如订单表)包含越来越多的记录数。

图片

您可以使用这些数据集更多了解 Trino 支持的 SQL,正如第 8 章和第 9 章中讨论的那样,而无需连接到另一个数据库(例如,还可以使用 Docker 容器)。

这些连接器的另一个重要用途是数据的简单可用性。您可以在开发和测试中使用这些连接器,甚至在生产环境中使用 Trino 部署。通过轻松获得大量数据,您可以构建查询,对您的 Trino 集群施加重大负载。 如果您想要使用 TPC 数据集测试对象存储或 RDBMS 的性能,您可以运行 CREATE TABLE AS SELECT 查询,将连接器中的数据复制到另一个目录中。

然后,您可以对特定基础架构中的数据使用相同的 TPC 测试查询。 这样可以更好地了解集群的性能,进行调整和优化,并确保其在时间和版本更新以及其他更改中的性能。

Hive 连接器用于分布式存储数据源

正如你在“Trino 简史”中了解到的那样,Trino 被设计用于在 Facebook 规模下运行快速查询。考虑到 Facebook 在其 Hive 数据仓库中拥有大规模的存储,所以 Hive 连接器是最早与 Trino 一起开发的连接器之一,这是理所当然的。

Apache Hadoop 和 Hive

在了解 Hive 连接器及其对多种对象存储格式的适用性之前,您需要对 Apache Hadoop 和 Apache Hive 的知识有所了解。

如果您对 Hadoop 和 Hive 不太熟悉,并且想要了解更多信息,我们建议您查阅官方项目网站、网络上的视频和其他资源,以及一些优秀的图书。例如,Edward Capriolo 等人的《Programming Hive》(O'Reilly)对我们来说是一本很好的指南。

现在,我们需要讨论一些 Hadoop 和 Hive 的概念,以便为 Trino 的使用提供足够的上下文。

在其核心,Hadoop 由 Hadoop 分布式文件系统(HDFS)和应用软件(例如 Hadoop MapReduce)组成,用于与存储在 HDFS 中的数据进行交互。Apache YARN 用于管理 Hadoop 应用程序所需的资源。Hadoop 是分布式处理大规模数据集的领先系统,可以在一组计算机集群上扩展系统,并在计算机集群之上提供高可用的服务。

最初,通过编写 MapReduce 程序来执行数据处理。它们遵循一种特定的编程模型,可以自然地将数据处理分布在整个集群中。该模型运作良好且稳健。然而,为分析性问题编写 MapReduce 程序很繁琐。对于依赖 SQL 和数据仓库的现有基础架构、工具和用户来说,这种模型也不易转移。

Hive 提供了使用 MapReduce 的替代方法。它被创建为在 Hadoop 之上提供一个 SQL 抽象层,通过类似 SQL 的语法与存储在 HDFS 中的数据进行交互。现在,了解和理解 SQL 的大量用户可以与存储在 HDFS 中的数据进行交互。

Hive 数据以文件形式存储在 HDFS 中,通常称为对象。这些文件使用各种格式,例如 ORC、Parquet 等。这些文件使用 Hive 可以理解的特定目录和文件布局进行存储(例如,分区和桶化表)。我们将这种布局称为 Hive 风格的表格式。

Hive 元数据描述了存储在 HDFS 中的数据如何映射到可通过 Hive 查询语言(以及间接地通过 Trino 和 SQL)查询的模式、表和列。此元数据信息存储在诸如 MySQL 或 PostgreSQL 之类的数据库中,并可通过 Hive 元数据存储服务(HMS)访问。

Hive 运行时提供了类似 SQL 的 Hive 查询语言和分布式执行层来执行查询。Hive 运行时将查询转换为一组可在 Hadoop 集群上运行的 MapReduce 程序。随着时间的推移,Hive 逐渐演变为提供其他执行引擎,如 Apache Tez 和 Spark。

Hadoop 和 Hive 在整个行业广泛使用。随着它们的使用,HDFS 格式已成为许多其他分布式存储系统的支持格式,例如 Amazon S3 和兼容 S3 的存储、Azure Data Lake Storage、Azure Blob Storage、Google Cloud Storage 等。

Hive 连接器

Trino 的 Hive 连接器允许您连接到一个 HDFS 对象存储集群。它利用 HMS 中的元数据,并查询和处理存储在 HDFS 中的数据。

Trino 最常见的用例之一可能是利用 Hive 连接器从分布式存储(如 HDFS 或云存储)中读取数据。

Hive 连接器允许 Trino 从分布式存储(如 HDFS)读取和写入数据。然而,它并不局限于 HDFS,而是设计用于与分布式存储一般的工作。目前,您可以配置 Hive 连接器以与 HDFS、S3、Azure Blob Storage、Azure Data Lake Storage、Google Cloud Storage 和 S3 兼容存储一起工作。S3 兼容存储可以包括 MinIO、Ceph、IBM Cloud Object Storage、SwiftStack、Cloudian、Riak CS、LeoFS、OpenIO 等。只要它们实现了 S3 API 并且行为基本一致,Trino 大部分时间不需要知道差异。

由于 Hadoop 和其他兼容 HDFS 的系统的广泛使用,以及 Hive 连接器支持它们的扩展功能集,可以将其视为使用 Trino 查询对象存储的主要连接器,因此对于许多 Trino 用户来说至关重要。

在架构上,Hive 连接器与 RDBMS 和其他连接器有些不同,因为它根本不使用 Hive 引擎。因此,它无法将 SQL 处理推送给 Hive。相反,它只是使用 HMS 中的元数据,并使用 Hadoop 项目提供的 HDFS 客户端直接访问 HDFS 上的数据。它还假设数据在分布式存储中的组织方式符合 Hive 表格式。

在所有情况下,模式信息从 HMS 中获取,数据布局与 Hive 数据仓库相同。概念是相同的,只是数据存储在除 HDFS 之外的位置。然而,与 Hadoop 不同,这些非 Hadoop 分布式文件系统并不总是具有用于存储 Trino 使用的元数据的 HMS 等效项。为了利用 Hive 样式的表格式,您必须配置 Trino 以使用现有 Hadoop 集群中的 HMS 或您自己的独立 HMS。这可能意味着您使用 AWS Glue 等替代 HMS,或者只运行一个仅包含 HMS 的最小化 Hadoop 部署。

使用 HMS 描述除 HDFS 以外的 blob 存储中的数据允许使用 Hive 连接器查询这些存储系统。这将为 Trino 和任何能够使用 SQL 的工具提供对这些系统中存储的数据的全部查询能力。

配置目录以使用 Hive 连接器需要创建一个具有所需名称的目录属性文件,例如 etc/catalog/s3.properties、etc/catalog/gcs.properties、etc/catalog/minio.properties,甚至只是 etc/catalog/hdfs.properties 或 etc/catalog/objectstorage.properties。在下面的示例中,我们假设使用 etc/catalog/datalake.properties。最低要求是配置连接器名称和 HMS 的 URL:

connector.name=hive
hive.metastore.uri=thrift://example.net:9083

还有许多其他配置属性适用于不同的用例,您很快将更多了解其中的一些。如果有疑问,始终查阅文档;请参阅“文档”。让我们接下来深入了解其中的一些细节。

Hive-Style 表格格式

配置完连接器后,您可以从 Trino 中创建模式,例如在 HDFS 中:

CREATE SCHEMA datalake.web
WITH (location = 'hdfs://starburst-oreilly/web')

模式(有时仍称为数据库)可以包含多个表。您可以在下一节中了解更多关于它们的信息。模式创建通常仅在 HMS 中创建有关模式的元数据:

...
hdfs://starburst-oreilly/web/customers
hdfs://starburst-oreilly/web/clicks
hdfs://starburst-oreilly/web/sessions
...

使用 Amazon S3 并没有太大的区别。您只需使用不同的协议字符串:

CREATE SCHEMA datalake.web
WITH (location = 's3://example-org/web')
...
s3://example-org/web/customers
s3://example-org/web/clicks
s3://example-org/web/sessions
...

如果您注意到与目录结构中的路径相似之处,那是正确的。数据库、模式和表可以像对象存储的用户界面中的嵌套目录一样进行检查,尽管底层设置可能使用或不使用目录。

托管表和外部表

在架构中,我们需要了解关于模式中组织为表的内容。Hive 区分托管表和外部表。托管表由 Hive 管理,因此也可能由 Trino 管理,并且在数据库目录位置下与其数据一起创建。外部表不由 Hive 管理,它明确指向 Hive 管理的目录之外的另一个位置。

托管表和外部表的主要区别在于 Hive(以及可能的 Trino)拥有托管表中的数据。如果删除托管表,HMS 中的元数据和数据都会被删除。如果删除外部表,数据将保留,只有关于表的元数据被删除。

您使用的表类型实际上取决于您计划如何使用 Trino。您可能会将 Trino 用于数据联邦、数据仓库或数据湖,或同时使用两者,或其他混合方式。您需要决定谁拥有数据。可以是 Trino 与 HMS 一起工作,也可以是 Hadoop 和 HMS、Spark 或 ETL 流水线中的其他工具。在所有情况下,元数据都在 HMS 中进行管理。

关于哪个系统拥有和管理 HMS 和数据的决策通常基于您的数据架构。在 Trino 的早期用例中,Hadoop 通常控制数据生命周期。但随着越来越多的用例将 Trino 作为中心工具,许多用户会改变模式,Trino 接管控制权。

一些新的 Trino 用户开始查询现有的 Hadoop 部署。在这种情况下,它更像是一种数据联邦用途,Hadoop 拥有数据。然后,您配置 Hive 连接器以公开 Hadoop 中的现有表供 Trino 查询。在这种情况下,通常不允许 Trino 写入这些表。

其他 Trino 用户可能会开始从 Hadoop 迁移到完全使用 Trino,或者从另一个对象存储系统开始,通常是基于云的系统。在这种情况下,最好通过 Trino 创建数据定义语言(DDL)来让 Trino 拥有数据。

让我们考虑以下 Trino 表的 DDL:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date,
  country varchar
)

在这个例子中,表 page_views 存储在一个名为 page_views 的目录下。这个 page_views 目录要么是 hive.metastore.warehouse.dir 定义的目录下的子目录,要么是在创建模式时定义的不同目录。

这里是一个 HDFS 的例子:hdfs://user/hive/warehouse/web/page_views/...

这是一个 Amazon S3 的例子:s3://example-org/web/page_views/...

接下来,让我们考虑一个指向现有数据的 Trino 表的 DDL。这些数据是通过其他方式创建和管理的,比如通过 Spark 或 ETL 过程,数据被存储在某个位置。在这种情况下,你可以通过 Trino 创建一个外部表,指向这些数据的外部位置:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date,
  country varchar
)
WITH (
  external_location = 's3://starburst-external/page_views'
)

这将插入有关该表的元数据到 HMS 中,包括外部路径和一个标志,用于向 Trino 和 HMS 表明该表是外部表,因此由另一个系统管理。

结果是,位于 s3://example-org/page_views 的数据可能已经存在。一旦在 Trino 中创建了该表,你就可以开始查询它。当你将 Hive 连接器配置到一个现有的 Hive 数据仓库时,你可以看到现有的表,并立即从这些表中进行查询。

或者,你可以在一个空目录中创建表,并期望稍后由 Trino 或外部来源加载数据。在任何情况下,Trino 都期望目录结构已经创建;否则,DDL 将出错。创建外部表的最常见情况是与其他工具共享数据。

分区数据

到目前为止,你已经了解到表的数据,无论是管理表还是外部表,都存储在一个或多个文件中的目录中。数据分区是对此的扩展,它是一种将逻辑表水平划分为较小数据块(称为分区)的技术。

这个概念本身源于关系型数据库管理系统中的分区方案。Hive 引入了这种技术,用于在 HDFS 中对数据进行更好的查询性能和数据管理。

分区现在是分布式文件系统(如 HDFS)和对象存储(如 S3)中的标准数据组织策略。

让我们使用这个表的示例来演示分区:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date
)
WITH (
  partitioned_by = ARRAY['view_date']
)

与非分区表一样,page_views 表的数据位于.../page_views 中。使用分区会改变表的布局结构。对于分区表,会在表的子目录中添加额外的子目录。在下面的示例中,你可以看到按照分区键定义的目录结构:

...
.../page_views/view_date=2019-01-14/...
.../page_views/view_date=2019-01-15/...
.../page_views/view_date=2019-01-16/...
...

Trino 也使用相同的 Hive 风格的表格式。此外,你可以选择在多个列上进行分区:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date,
  country varchar
)
WITH (
  partitioned_by = ARRAY['view_date', 'country']
)

当选择多个分区列时,Trino 会创建一个分层的目录结构:

...
.../page_views/view_date=2019-01-15/country=US…
.../page_views/view_date=2019-01-15/country=PL…
.../page_views/view_date=2019-01-15/country=UA...
.../page_views/view_date=2019-01-16/country=US…
.../page_views/view_date=2019-01-17/country=AR...
...

分区可以提供更好的查询性能,特别是在数据规模增大时。例如,让我们看下面的查询:

SELECT DISTINCT user_id
FROM page_views
WHERE view_date = DATE '2019-01-15' AND country = 'US';

当提交这个查询时,Trino 会识别 WHERE 子句中的分区列,并使用关联的值来只读取 view_date=2019-01-15/country=US 子目录。通过只读取所需的分区,可能会实现大量的性能节省。如果您的数据目前很小,可能无法注意到性能提升。但随着数据的增长,改进的性能将变得显著。

加载数据

迄今为止,您已经了解了 Hive 风格的表格格式,包括分区数据。那么如何将数据加载到表格中呢?这实际上取决于谁拥有数据。让我们先假设您在 Trino 中创建表格并使用 Trino 加载数据:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date,
  country varchar
)
WITH (
  partitioned_by = ARRAY['view_date', 'country']
)

要通过 Trino 加载数据,Trino 支持 INSERT INTO ... VALUES、INSERT INTO ... SELECT 和 CREATE TABLE AS SELECT 这几种方式。虽然 INSERT INTO 存在,但使用有限,因为它每个语句只创建一个文件和一行数据。在学习 Trino 时,使用 INSERT INTO 通常是不错的选择。

INSERT SELECT 和 CREATE TABLE AS 执行相同的功能。您选择使用哪个取决于您是要加载到现有表格还是在加载数据时创建新表格。让我们以 INSERT SELECT 为例,您可能会从一个非分区的外部表格中查询数据并将其加载到 Trino 中的分区表格中:

trino:web> INSERT INTO page_views_ext SELECT * FROM page_views;
INSERT: 16 rows

如果您熟悉 Hive,Trino 执行的是所谓的动态分区:当 Trino 首次检测到没有目录的分区列值时,会创建分区的目录结构。 您还可以在 Trino 中创建外部分区表。假设我们有以下具有数据的 S3 目录结构:

...
s3://example-org/page_views/view_date=2019-01-14/...
s3://example-org/page_views/view_date=2019-01-15/...
s3://example-org/page_views/view_date=2019-01-16/...
...

我们创建外部表的定义:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  view_date date
)
WITH (
  partitioned_by = ARRAY['view_date']
)

现在让我们从中进行查询:

trino:web> SELECT * FROM page_views;
 view_time | user_id | page_url | view_date
-----------+---------+----------+-----------
(0 rows)

发生了什么?尽管我们知道其中有数据,但 HMS 无法识别分区。如果您熟悉 Hive,您可能知道"MSCK REPAIR TABLE"命令用于自动发现所有分区。幸运的是,Trino 也有自己的命令来自动发现和添加分区:

CALL system.sync_partition_metadata(
  'web',
  'page_views',
  'FULL'
)
...

现在您已经添加了分区,让我们再试一次:

trino:web> SELECT * FROM page_views;
        view_time        | user_id | page_url | view_date
-------------------------+---------+----------+------------
 2019-01-25 02:39:09.987 |     123 | ...      | 2019-01-14
 ...
 2019-01-25 02:39:11.807 |     123 | ...      | 2019-01-15
 ...

另外,Trino 还提供了手动创建分区的功能。这通常比较麻烦,因为您需要逐个使用命令来定义每个分区:

CALL system.create_empty_partition(
  'web',
  'page_views',
  ARRAY['view_date'],
  ARRAY['2019-01-14']
)
...

当您希望通过 ETL 流程在 Trino 之外创建分区,并希望将新数据暴露给 Trino 时,添加空分区非常有用。Trino 还支持通过在 DELETE 语句的 WHERE 子句中指定分区列的值来删除分区。在这个例子中,数据保持不变,因为它是一个外部表:

DELETE FROM datalake.web.page_views
WHERE view_date = DATE '2019-01-14'

重要的是要强调,您不必使用 Trino 来管理表和数据,但如果需要,也可以这样做。许多用户利用 Hive 或其他工具来创建和操作数据,只使用 Trino 来查询数据。

文件格式和压缩

Trino 支持许多 Hadoop/HDFS 中常用的文件格式,包括:

  • ORC
  • Parquet
  • Avro
  • JSON
  • TextFile
  • RCText
  • RCBinary
  • CSV
  • SequenceFile

在 Trino 中,最常用的三种文件格式是 ORC、Parquet 和 Avro 数据文件。Trino 针对 ORC、Parquet、RC Text 和 RC Binary 格式的读取器进行了性能优化。

HMS 中的元数据包含了文件格式信息,这样 Trino 在读取数据文件时就知道要使用哪个读取器。在 Trino 中创建表时,默认的文件格式是 ORC。但是,可以在 CREATE TABLE 语句的 WITH 属性中覆盖默认设置,如下所示:

CREATE TABLE datalake.web.page_views (
  view_time timestamp,
  user_id bigint,
  page_url varchar,
  ds_date,
  country varchar
)
WITH (
  format = 'PARQUET'
)

在目录属性文件中,可以使用 hive.storage-format 配置设置所有表创建的默认存储格式。

默认情况下,Trino 使用 GZIP 压缩编解码器来写入文件。可以通过在目录属性文件中设置 hive.compression-codec 配置来更改编解码器为 SNAPPY 或 NONE。

现代分布式存储管理和分析

如前所述,Trino 的核心用例之一是在数据湖上进行快速分析和其他基于 SQL 的工作负载。这些系统基于 Hadoop 对象存储系统和其他兼容的数据存储解决方案,并使用 Hive Metastore Service 来管理元数据。Hive 连接器与这两个系统进行交互。数据湖一直以来被证明是存储大量结构化和半结构化数据的优秀解决方案。

然而,时间和创新并没有停滞不前。数据湖的大规模采用也迅速暴露出许多问题。对于习惯于关系型数据库的用户来说,缺乏 ACID 兼容性是一个遗漏的问题。对于许多用例来说,模式演化被发现是至关重要的,但在处理大规模数据时非常困难甚至不可能实现。在遇到问题的用户中,一些倡议应运而生,并导致了解决这些问题以及其他一些问题的新型解决方案的出现:

  • Delta Lake:由 Databricks 开发,源自 Apache Spark 生态系统。
  • Apache Hudi:最初由 Uber 开发。
  • Apache Iceberg:在 Netflix 开始启动。

这三个系统实现了新的机制,改进了数据湖,并使其适用于分析和其他更复杂的用例,超越了简单的数据存储。这些新的表格格式和系统的部署通常被称为数据湖仓库(data lakehouse),它结合了数据仓库和数据湖的成功特征。

Iceberg 是这些表格格式中的领导者,具有许多强大的功能,包括以下内容:

  • 支持 ACID 事务
  • 没有大规模操作负担的模式演化
  • 快照、时间旅行支持和事务回滚
  • 使用 ORC、Parquet 和 Avro 等文件格式的高效数据存储
  • 借助先进的分区、分桶、压缩、清理和索引等功能实现访问和数据管理的改进
  • 内置且高效的元数据管理
  • 支持变更数据捕获、流式摄取和审计历史

Iceberg 还包括完整的表格格式开放规范,事实上已经发展到第 2 版。Delta Lake 也提供了许多这些功能,而 Hudi 在架构上更受限制。

所有这些功能解决了在 Hadoop/Hive 中经常难以克服的问题。在其他情况下,它们在操作上非常困难或昂贵,涉及时间、性能或存储方面的大缺陷,因此用户决定放弃相关用例,寻找替代方案或接受这些缺陷。

Iceberg 连接器首先在与 Iceberg 社区的紧密合作中创建,并包含在每个 Trino 版本中。它非常功能丰富,支持诸多高级功能,如时间旅行和材料化视图。

Delta Lake 连接器是作为商业版 Trino 发行版 Starburst Enterprise 的一部分创建的,并于 2022 年由 Starburst 捐赠给 Trino 社区。Databricks 和 Starburst 继续与社区改进该连接器。该连接器随每个 Trino 版本一起提供,并支持完整的读写操作和许多高级功能。

Hudi 连接器目前正在与 Hudi 的创建者进行积极开发。到您阅读本书时,初始实现可能已经完成,并且该连接器可在每个 Trino 版本中使用。

连接器的配置和使用与 Hive 连接器类似。您只需要一个像 S3 这样的对象存储文件系统、一个元数据存储和一个用于配置详细信息的目录属性文件。以下是使用 Iceberg 连接器的简单示例:

connector.name=iceberg
hive.metastore.uri=thrift://metastore.example.com:9083

有了这些可替代方案,应该避免创建新的数据湖,而是使用数据湖仓库(lakehouse)。Trino 支持这三种格式。也可以考虑从使用 Hive 连接器访问的现有数据湖进行迁移。

非关系型数据源

Trino 包括用于查询各种非关系型数据源的连接器。这些数据源通常被称为 NoSQL 系统,可以是键值存储、列存储、流处理系统、文档存储和其他系统。

其中一些数据源提供类似 SQL 的查询语言,例如 Apache Cassandra 的 CQL。其他数据源仅提供特定的工具或 API 来访问数据,或者包含完全不同的查询语言,例如 Elasticsearch 中使用的查询领域专用语言。这些语言的完整性通常有限且不标准化。

Trino 对这些 NoSQL 数据源的连接器允许您像处理关系型数据一样运行 SQL 查询。这使您可以使用诸如商业智能工具之类的应用程序,或者允许那些熟悉 SQL 的人查询这些数据源。这包括对这些数据源进行连接、聚合、子查询和其他高级 SQL 功能的使用。

在下一章中,您将更多地了解一些这些连接器:

  • NoSQL 系统,如 Elasticsearch 或 MongoDB - "文档存储连接器示例:Elasticsearch"
  • 流处理系统,如 Apache Kafka - "流处理系统连接器示例:Kafka"
  • 键值存储系统,如 Apache Accumulo - "键值存储连接器示例:Accumulo" 和 Apache Cassandra - "Apache Cassandra 连接器"
  • 使用 Apache Phoenix 连接器的 Apache HBase - "使用 Phoenix 连接到 HBase"

现在我们跳过这些内容,首先讨论一些更简单的连接器和相关内容。

Trino JMX Connector(JMX 连接器)

JMX 连接器可以在目录属性文件(例如 etc/catalog/monitor.properties)中轻松配置使用:

connector.name=jmx

JMX 连接器公开了关于运行 Trino 协调器和工作节点的 JVM 的运行时信息。它使用 Java Management Extensions (JMX) 并允许您在 Trino 中使用 SQL 访问可用的信息。它在监控和故障排除方面特别有用。

该连接器提供了一个历史模式用于历史聚合数据,一个当前模式提供最新信息,以及一个 information_schema 模式用于元数据。 了解更多信息的最简单方法是在 Trino 中使用 SQL 语句来调查可用的表:

[`SHOW TABLES FROM monitor.current;
                Table ------------------------------------------------------------------
 com.sun.management:type=diagnosticcommand
 com.sun.management:type=hotspotdiagnostic
 io.airlift.discovery.client:name=announcer
 io.airlift.discovery.client:name=serviceinventory
 io.airlift.discovery.store:name=dynamic,type=distributedstore
 io.airlift.discovery.store:name=dynamic,type=httpremotestore
 ....`]()

如您所见,表名使用了发出度量指标的类和参数的 Java 类路径。这意味着在 SQL 语句中引用表名时需要使用引号。通常,查找表中可用的列是很有用的:

DESCRIBE monitor.current."java.lang:type=runtime";
        Column         |  Type   | Extra | Comment
------------------------+---------+-------+---------
 bootclasspath          | varchar |       |
 bootclasspathsupported | boolean |       |
 classpath              | varchar |       |
 inputarguments         | varchar |       |
 librarypath            | varchar |       |
 managementspecversion  | varchar |       |
 ...
 vmname                 | varchar |       |
 vmvendor               | varchar |       |
 vmversion              | varchar |       |
 node                   | varchar |       |
 object_name            | varchar |       |
(20 rows)

这样可以让您以漂亮的格式获取信息:

SELECT vmname, uptime, node FROM  monitor.current."java.lang:type=runtime";
          vmname          | uptime  |     node
--------------------------+---------+--------------
 OpenJDK 64-Bit Server VM | 1579140 | ffffffff-ffff
(1 row)

请注意,此查询仅返回一个节点,因为这是一个简单的单个协调器/工作节点安装,如第 2 章所述。JMX 连接器公开有关 JVM 以及 Trino 特定方面的信息。您可以通过查看以 trino 开头的表来开始探索,例如使用 DESCRIBE monitor.current."trino.execution:name=query​execu⁠tion";。 以下是一些值得查看的其他DESCRIBE语句:

DESCRIBE monitor.current."trino.execution:name=querymanager";
DESCRIBE monitor.current."trino.memory:name=clustermemorymanager";
DESCRIBE monitor.current."trino.failuredetector:name=heartbeatfailuredetector";

黑洞连接器(Black Hole Connector)

黑洞连接器(Black Hole Connector)可以轻松地在目录属性文件中进行配置,例如 etc/catalog/abyss.properties

connector.name=blackhole

黑洞连接器(Black Hole Connector)充当任何数据的接收端,类似于 Unix 操作系统中的空设备/dev/null。这使您可以将其用作从其他目录读取的任何插入查询的目标。由于它实际上不会写入任何数据,因此您可以使用它来测量来自这些目录的读取性能。

例如,您可以在黑洞目录(abyss)中创建一个测试模式(schema),并从 tpch.tiny 数据集创建一个表。然后,您可以从 tpch.sf3 数据集中读取数据,并将其插入到 abyss 目录中:

CREATE SCHEMA abyss.test;
CREATE TABLE abyss.test.orders AS SELECT * from tpch.tiny.orders;
INSERT INTO abyss.test.orders SELECT * FROM tpch.sf3.orders;

这个操作实际上是测量了从 tpch 目录读取数据的性能,因为它读取了 450 万个订单记录然后将其发送到 abyss 目录。如果使用像 tpch.sf100 这样的其他模式,数据集的大小会增加。这可以让您评估 Trino 部署的性能。

对于 RDBMS、对象存储或键值存储目录,类似的查询可以帮助进行查询开发和性能测试,并进行改进。

内存连接器

内存连接器可以在目录属性文件(例如 etc/catalog/brain.properties)中进行配置:

connector.name=memory

您可以像临时数据库一样使用内存连接器。所有数据存储在集群的内存中。停止集群会销毁数据。当然,您也可以使用 SQL 语句主动删除表中的数据,甚至完全删除表。

使用内存连接器对于测试查询或临时存储非常有用。例如,我们可以将其用作在使用鸢尾花数据集时无需配置外部数据源的简单替代方案;请参阅“鸢尾花数据集”。

其他连接器

正如您现在所了解的,Trino 项目包含许多连接器,但有时您可能会遇到需要一个特定数据源的额外连接器的情况。 好消息是,您不会陷入僵局。Trino 团队和更大的 Trino 社区不断扩展可用连接器的列表,因此在您阅读本书时,该列表可能比现在更长。

除了 Trino 项目本身之外,还可以从 Trino 项目之外的其他方获取连接器。这包括其他社区成员和 Trino 的用户编写的连接器,他们尚未贡献回来,或者由于某种原因无法贡献回来。 还可以从商业数据库系统的商业供应商获取连接器,因此建议向您想要查询的系统的所有者或创建者询问。Trino 社区还包括商业供应商,如 Starburst,他们捆绑了 Trino,并提供支持和扩展功能,包括额外或改进的连接器。

最后但并非最不重要的是,您必须记住 Trino 是围绕开源项目的友好社区。因此,您可以并且鼓励您查看现有连接器的代码,并根据需要创建新的连接器。理想情况下,您甚至可以与项目一起工作,并将连接器贡献回项目,以便以后进行简单的维护和使用。这将使您接受来自社区中其他用户和贡献者的帮助,并将您的连接器暴露给其他用例,可能会提高连接器的整体质量。

发表回复