4年数据涨万倍,Uber大数据平台四次变迁揭秘
作者:|Reza Shiftehfar
译者:陈亮芬
编辑:Debra
导读:Uber 致力于在全球市场上提供更安全、更可靠的交通工具。为了实现这一目标,Uber 非常依赖于各个层面的数据驱动决策,从在高流量事件期间预测乘客需求,到识别和解决合作司机们在注册过程中的瓶颈。随着时间的推移,想要获取更多见解的需求产生了超过 100PB 的分析数据,这些数据需要通过基于 Hadoop 的大数据平台以最小的延迟来清理、存储并提供服务。自 2014 年以来,我们一直致力于开发一种大数据解决方案,以确保数据的可靠性、可伸缩性和易用性,目前我们正致力于提高平台的速度和效率。
在本文中,我们将深入了解 Uber 的 Hadoop 平台,并讨论下一步如何扩展这个丰富而复杂的生态系统。
第一代:Uber 大数据的开端在
2014 年之前,我们有限的数据量可以塞进一些传统的联机事务处理 (OLTP) 数据库中(例如 MySQL 和 PostgreSQL)。为了利用这些数据,我们的工程师必须单独访问每个数据库或表,如果用户想将不同数据库的数据组合起来,需要自己编写代码。当时,我们还没有对所有存储的数据进行全局访问的需求,也没有这些数据的全局视图。事实上,我们的数据分散在不同的 OLTP 数据库中,总数据大小约为几 TB,访问这些数据的延迟非常短 (通常不到一分钟)。图 1 是我们在 2014 年之前的数据架构概览:
随着 Uber 业务量呈指数级增长(业务量包括 Uber 运营的城市 / 国家数量和每个城市使用该服务的乘客 / 司机数量),传入数据量跟着也增加了,我们需要建立第一代分析数据仓库,以满足访问和分析一个地方所有数据的需求。为了让 Uber 尽可能接近数据驱动,我们需要确保分析师可以在同一个地方访问分析数据。为实现这一目标,我们首先将数据用户分为三大类:
城市运营团队(数千人):这些现场工作人员负责管理和扩展每个市场的运输网络。随着业务扩展到新的城市,就有成千上万个城市运营团队需要定期访问这些数据,以便对司机与乘客的问题做出响应。
数据科学家和分析师(数百人):这些分析师和科学家分布在不同的功能组中,他们需要利用数据来帮助我们为用户提供最佳的运输和交付体验,例如通过预测乘客需求来提供及时的服务。
工程师团队(数百人):整个公司的工程师都专注于构建自动数据应用程序,比如欺诈检测和新司机准入平台。
我们的第一代分析数据仓库专注于将 Uber 的所有数据聚合在一个地方以及简化数据访问。对于前者,我们决定使用 Vertica 作为我们的数据仓库软件,因为它速度快、可扩展,而且是面向列的。我们还开发了多个临时 ETL 作业,这些作业将来自不同数据源(即 AWS S3、OLTP 数据库、服务日志等)的数据复制到 Vertica 中。为了实现后者,我们将 SQL 作为我的解决方案接口,同时构建了一个联机查询服务来处理用户查询,并将这些查询需求提交给底层查询引擎。图 2 描述了我们的分析数据仓库:
第一个数据仓库服务的发布是一个巨大的成功。这个服务使得数据用户第一次拥有了全局视图,可以从一个地方访问所有数据。这一新的突破使得大量新团队使用数据分析作为其技术和产品决策的基础。在几个月内,我们的分析数据量增长到数十 TB,用户数量增加到数百个。
SQL 作为简单的标准接口来使用,使城市运营者们能够轻松地实现数据交互,无需了解底层技术。此外,不同的工程师团队开始构建针对用户个性化需求定制的服务和产品,这些服务和产品通过这些数据(即 uberPool、前期定价等)得到所需信息。与此同时,陆续组建新的团队以便更好地使用和提供这些数据(即我们的机器学习和实验团队)。
限制
另一方面,随着数据仓库和传入数据的广泛使用,我们也逐渐看到了一些限制。由于数据是通过临时 ETL 作业摄取的,而我们缺乏正式的模式通信机制,这导致数据可靠性成为一个问题。我们的大多数源数据都是 JSON 格式,并且摄取作业无法弹性应对生产者的代码变更。
随着公司的发展,数据仓库扩展变得越来越昂贵。为了降低成本,我们开始删除过时的数据,以释放出空间留给新数据。除此之外,我们大数据平台的很大一部分不能水平扩展,因为我们之前的主要目标是满足对集中数据的访问,根本没有足够的时间来确保所有部分都是水平可扩展的。我们的数据仓库实际上被用作数据湖,堆积所有原始数据,执行数据建模和提供数据。
此外,由于生成数据的服务与下游数据消费者之间缺乏正式合约,将数据摄入至数据仓库的 ETL 作业非常不稳定(使用灵活的 JSON 格式导致源数据缺乏模式约束)。如果不同的用户在摄取期间执行不同的转换,可能会多次摄取同一批数据。这对上游数据源(即联机存储数据)造成了额外的压力,也影响了它们的服务质量。另外,这导致我们的仓库中存储了相同数据的几个副本,进一步增加了存储成本。至于数据质量,由于 ETL 作业是临时的,并且依赖数据源,而且数据投射和转换是在摄取期间进行的,所以回填非常耗费时间和精力。由于摄取工作缺乏标准化,我们也很难摄取任何新的数据集和类型。
第二代:Hadoop 的到来
为了解决这些限制,我们围绕 Hadoop 生态系统重新构建了大数据平台。具体来说,我们引入了一个 Hadoop 数据湖,所有的原始数据仅需要从不同的联机数据存储中摄取一次,并且在摄取期间没有进行转换。这种设计转变明显降低了联机数据存储的压力,让我们从临时摄取作业转变为可扩展的摄取平台。为了让用户能够访问 Hadoop 中的数据,我们引入了 Presto 来实现交互式临时用户查询,引入 Apache Spark 来促进对原始数据(包括 SQL 和非 SQL 两种格式)的编程访问,并将 Apache Hive 作为主力来应对非常大的查询。这些不同的查询引擎让用户可以获得最能满足其需求的工具,这也使得我们的平台变得更加灵活和易于访问。
为了保持平台的可扩展性,我们确保所有数据建模和转换仅在 Hadoop 中进行,从而在出现问题时能够进行快速的回填和恢复。只有最关键的建模表(城市运营者们实时利用这些表来实现快速的 SQL 查询)才被转移到我们的数据仓库中。这大大降低了数据仓库的运营成本,同时还将用户引导到基于 Hadoop 的查询引擎。
我们还利用了 Apache Parquet 这个标准的列式文件格式,提高了压缩率,从而节省了存储空间,同时因为分析查询采用了列式访问,从而获得了计算资源收益。此外,Parquet 与 Apache Spark 的无缝集成使得该解决方案成为访问 Hadoop 数据的流行选择。图 3 总结了我们的第二代大数据平台的架构:
除了整合 Hadoop 数据湖之外,我们还让生态系统中的所有数据服务都可以水平扩展,从而提高了大数据平台的效率和稳定性。这种通用的水平可扩展性可以满足直接的业务需求,让我们能够集中精力构建下一代数据平台,而不是解决临时问题。
与第一代平台不同的是,数据管道易受上游数据格式变更影响的问题将不复存在,我们的第二次迭代使得我们可以对所有数据进行模式化,从 JSON 转换到 Parquet,将模式和数据存储在一起。为此,我们构建了一个集中模式服务来收集、存储和提供模式,还提供了不同的客户端库,用于将不同的服务与这个集中式服务集成在一起。不稳定的临时数据摄取作业被标准平台替换,以便将原始嵌套格式的源数据传输到 Hadoop 数据湖中。
随着 Uber 的业务继续以闪电般的速度扩展,我们很快就拥有了数十 PB 的数据。而且每天都有数十 TB 的新数据被添加到数据湖中,我们的大数据平台增长到超过 10000 个 vcore,任何一天都有超过 100000 个批处理作业。这也使得我们的 Hadoop 数据湖成为所有分析 Uber 数据的事实来源。
限制
随着公司的不断扩展,我们的生态系统中存储了数十 PB 的数据,我们不得不面临一系列新的挑战。
首先,由于需要摄取更多数据以及有更多用户编写临时批处理作业(生成更多输出数据),存储在 HDFS 中的大量小文件开始给 HDFS NameNode 带来额外的压力。最重要的是,数据延迟仍然远远满足不了我们的业务需求。用户 24 小时内只能访问一次新数据,这对于需要做出实时决策的用户来说太慢了。虽然将 ETL 和建模迁移到 Hadoop 使得处理过程更具可扩展性,但因为这些 ETL 作业每次运行时都必须重新创建整个建模表,处理过程仍然存在瓶颈。除此之外,新数据的摄取和相关派生表的建模都基于整个数据集的快照,然后通过交换旧表和新表让用户能够访问新数据。摄取作业必须返回源数据存储,创建新快照,然后在每次运行期间将整个数据集摄取或转换为可消费的 Parquet 文件。随着我们的数据存储量的增长,运行 1000 个多 Spark 作业可能需要 20 多个小时。
每个作业的很大一部分都涉及将最新的快照转换为历史数据和新数据。虽然每个表每天只添加 100 多 GB 新数据,但摄取作业每次在运行时都必须转换 100 多 TB 的数据集。对于每次运行时都会重新创建新派生表的 ETL 和建模作业来说也是如此。由于历史数据的更新比例很高,这些作业必须依赖基于快照的源数据摄取。从本质上讲,我们的数据包含许多更新操作(例如乘客和司机评级、支持在行程结束后的几个小时甚至几天内的票价调整)。由于 HDFS 和 Parquet 不支持数据更新,所有摄取作业都需要从更新的源数据创建新快照,再将新快照摄取到 Hadoop 中,并转换为 Parquet 格式,然后交换输出表以查看新数据。图 4 总结了这些基于快照的摄取在我们的大数据平台中是如何流转的。
第三代:长期重建我们的大数据平台
到 2017 年初,整个公司的工程和运营团队都在使用我们的大数据平台,让他们能够从一个地方访问新数据或历史数据。用户可以通过 UI 门户(根据他们的个性化需求进行定制)轻松访问 Hive、Presto、Spark、Vertica、Notebook 和其他数据仓库中的数据。我们的 HDFS 中有 100 多 PB 的数据,计算集群中有 100000 多 vcore,每天有 100000 个 Presto 查询,10000 个 Spark 作业,以及 20000 个 Hive 查询,我们的 Hadoop 架构因此遇到了扩展瓶颈,很多服务受到了数据延迟的影响。
幸运的是,由于我们的底层基础设施可以水平扩展,以满足当前的业务需求,因此我们有足够的时间来研究数据内容、数据访问模式和用户特定需求,以便在构建下一代数据平台之前找出亟待解决的问题。我们发现了以下四个主要的痛点:
• HDFS 可扩展性限制:很多依赖 HDFS 扩展其大数据基础设施的公司都面临这个问题。从设计上来看,HDFS 的瓶颈是 NameNode 容量,因此存储大量小文件会对性能产生显著影响。当数据大小超过 10PB 时就会遇到扩展瓶颈,当数据大小超过 50-100PB 以后这个限制将会成为一个真正的问题。幸运的是,有一些相对简单的解决方案可以将 HDFS 从几十个 PB 扩展到几百个 PB,例如利用 ViewFS 和 HDFS NameNode Federation。通过控制小文件的数量,并将数据的不同部分移动到单独的集群(例如将 HBase 和 Yarn 的应用程序日志移动到一个单独的 HDFS 集群中),HDFS 的可扩展性问题得到了一定缓解。
• Hadoop 中更快的数据: Uber 的业务是实时运营的,所以我们的服务需要访问尽可能新鲜的数据。因此,在很多实际运营场景中,24 小时的数据延迟太慢了,这些实际场景对更快的数据传输存在巨大需求。我们的第二代大数据平台所采用的基于快照的摄取方法效率低下,导致我们不能以较低的延迟摄取数据。为了加快数据交付速度,我们不得不重新设计管道,以便只进行更新数据和新数据的增量摄取。
• 支持 Hadoop 和 Parquet 的更新和删除: Uber 的数据包含很多更新,变化范围从过去几天(例如乘客或者合作司机调整最近一次行程的价格)到过去几周(例如乘客开启新一次行程前对上一次行程进行打分)甚至过去几个月不等(例如由于业务需要回填或调整过去的数据)。通过基于快照的数据摄取,我们每 24 小时摄取一次源数据的新副本。换句话说,我们一次摄取所有更新,每天摄取一次。但是,由于需要摄取更新的数据和增量,我们的解决方案必须能够支持现有数据的更新和删除操作。然而,由于我们的数据存储在 HDFS 和 Parquet 中,无法直接支持对现有数据的更新操作。另一方面,我们的数据包含非常宽的表(每个表大约有 1000 个列),这些表有五个及五个以上级别的嵌套,而用户查询通常只需接触其中一些列,这导致我们不能以高效的方式使用非列式格式。为了使我们的大数据平台能够实现长期增长,我们必须找到一种方法来解决 HDFS 文件系统的这种限制,让它也可以支持更新 / 删除操作。
• 更快的 ETL 和建模:与原始数据摄取类似,ETL 和建模作业也基于快照,需要我们的平台在每次运行时重建派生表。为减少建模表的数据延迟,ETL 作业也需要利用增量。这要求 ETL 作业只需从原始源表中逐步地提取已更改的数据,并更新先前派生的输出表,而不是每隔几小时重建整个输出表。
引入 Hudi
考虑到上述要求,我们构建了 Hadoop Upserts anD Incremental (Hudi),这是一个开源的 Spark 库,在 HDFS 和 Parquet 之上提供抽象层,以支持所需的更新和删除操作。Hudi 可以被用在任意的 Spark 作业中,可以水平扩展,并且只依赖 HDFS。因此,任何需要支持历史数据更新 / 删除操作的大数据平台都可以使用 Hudi。
Hudi 使我们能够在 Hadoop 中更新、插入和删除现有的 Parquet 数据。此外,Hudi 使得数据用户可以只需逐步提取更改的数据,显著提升了查询效率,实现派生建模表的增量更新。
我们 Hadoop 生态系统中的原始数据是根据时间划分的,任何旧日期分区的数据都可能在之后接收到更新。因此,对于依赖这些原始源数据表的数据用户或 ETL 作业,了解哪个日期分区包含更新数据的唯一方法是扫描整个源表,并根据一些已知的时间概念来过滤不需要的记录。这涉及了计算成本非常高昂的查询,需要完全扫描源表,而且不能频繁地运行 ETL 作业。
使用 Hudi,用户很容易知道他们的最后一个检查点时间戳,并获取所有已更新的记录,无论这些更新是添加到最近日期分区的新记录,还是对旧数据的更新,都无需运行昂贵的查询来扫描整个源表。
使用 Hudi 库,我们能够从基于快照的原始数据摄取转换成增量摄取建模,这一转换使我们能够将数据延迟从 24 小时减少到一小时之内。图 5 描述了整合 Hudi 后的大数据平台:
通用数据摄取
Hudi 并不是第三代大数据平台的唯一变化。我们还通过 Apache Kafka 实现了存储和大数据团队之间的上游数据存储变更的交接。上游数据存储事件(以及来自不同应用程序和服务的日志消息)使用统一的 Avro 编码流入 Kafka,包括附加的标准全局元数据头部信息(比如时间戳、row key、版本、数据中心信息和源主机)。数据流团队和大数据团队都使用这些存储变更日志事件作为其源输入数据以进行进一步处理。
我们的数据摄取平台 Marmaray 以小批量的方式运行,并从 Kafka 获取上游的存储变更日志,通过 Hudi 库将这些日志应用在 Hadoop 的现有数据上。如之前所述,Hudi 支持更新插入操作,允许用户添加新记录和更新或删除历史数据。Spark 摄取作业每 10-15 分钟运行一次,在 Hadoop 中提供 30 分钟的原始数据延迟(具有 1-2 个摄取作业失败或重试的余量)。为避免因多次将相同的源数据摄取到 Hadoop 而导致效率低下,我们不允许在原始数据摄取期间进行任何转换,所以我们决定将原始数据提取框架变成 EL 平台,而不是传统的 ETL 平台。这个模型鼓励用户在上游数据以原始嵌套格式流入后,在 Hadoop 中以批处理模式执行所需的转换操作。
自从对我们的大数据平台实施这些更改以来,我们通过避免不必要或低效的摄取操作有效地节省了大量的计算资源。由于我们现在可以避免在摄取过程中进行容易出错的转换,原始数据的可靠性也得到了显著提升。现在,用户可以使用任何大数据处理引擎在原始源数据上运行转换。此外,一旦出现任何问题,用户可以再次重新运行转换,并通过使用更多计算资源和更高程度的并行性来更快地完成批转换作业,从而保证 SLA。
增量数据建模
考虑到需要将大量上游数据存储摄入到 Hadoop 中(截至 2017 年有超过 3000 个原始 Hadoop 表),我们还构建了一个通用的摄取平台,以便以统一和可配置的方式将原始数据摄入到 Hadoop 中。现在,我们的大数据平台以增量的方式逐步更新原始 Hadoop 表,数据延迟为 10-15 分钟,实现了对源数据的快速访问。但是,为了确保建模表也具有低延迟,我们必须避免建模 ETL 作业中的低效率操作(比如避免完全重新创建派生表或进行完整的源原始表扫描)。实际上,Hudi 允许 ETL 作业仅从源表中获取已更改的数据。建模作业只需要在每次迭代运行期间告知 Hudi 阅读器检查点时间戳,就可以从原始源表 (不管实际记录存储在哪个日期分区) 接收到一组新的或更新过的记录。
在 ETL 作业期间使用 Hudi Writer 使我们能够更新派生建模表中的旧日期分区,而无需重新创建整个分区和表。因此,我们的建模 ETL 作业使用 Hudi Reader 逐步从源表中获取已更改的数据,并使用 Hudi Writer 逐步更新派生的输出表。现在,ETL 作业也能在不到 30 分钟内完成,Hadoop 的所有派生表的端到端延迟减少到一个小时以内。
为了向 Hadoop 表的用户提供访问所有数据或仅访问新数据或更新数据的不同选项,使用 Hudi 存储格式的 Hadoop 原始表提供了两种不同的读取模式:
• 最新模式视图:在当前时间点提供整个 Hadoop 表的整体视图。这个视图包括所有记录的最新合并值以及表中的所有现有记录。
• 增量模式视图:仅根据给定时间戳从特定 Hadoop 表中获取新记录和更新记录。这个视图仅返回自最近检查点以来最近插入的以及已更新的行。此外,如果特定行自上一个检查点以来被多次更新,则将返回更新过程中所有更改的值(而不是仅返回最新的合并行)
图 6 描述了所有 Hadoop 表(以 Hudi 文件格式存储) 的这两个读取视图:
标准化数据模型
除了提供同一个表的不同视图外,我们还标准化了数据模型,为所有原始 Hadoop 数据提供两种类型的表:
• 变更日志历史记录表:包含特定上游表的所有变更日志历史记录。这个表让用户能够扫描给定表的变更历史记录,并且可以按照 key 进行合并,以提供每一行的最新值。
• 快照合并表:包含最新的上游表的合并视图。这个表包含所有历史变更日志的压缩合并视图。
图 7 描述了如何使用给定变更日志流为特定上游源数据存储生成不同的 Hive 原始表:
但是,变更日志流不一定包含给定 key 的整个行(所有列)。虽然合并的快照表始终提供特定 key 的所有列,但如果变更日志的上游流仅提供部分行变更日志,变更日志历史表可能就比较稀疏。如果用户希望从变更日志历史记录表中获取更改的值,并将其与合并的快照表相连以创建完整的数据行,我们还会在变更日志历史记录表的合并快照表中包含相同 key 的日期分区。通过避免合并快照表的完整表扫描,使得两个表能够更有效地进行跨分区连接。
图 8 总结了我们大数据平台的不同组件之间的关系:
第四代:下一步该怎样优化?
自 2017 年推出第三代大数据平台以来,整个公司的用户已经可以快速可靠地访问 Hadoop 中的数据,但仍然有提升的空间。我们正在努力增强我们的大数据平台,以改善数据质量、数据延迟,提升效率、可扩展性和可靠性等。
数据质量
为了提高数据质量,我们确定了两个需要改进的关键领域。首先,当某些上游数据存储没有强制执行或检查数据模式时(比如值为 JSON blob 的键值对),我们希望能够避免出现非相容模式的数据。不然不良数据会进入到我们的 Hadoop 生态系统,从而影响所有依赖这些数据的下游用户。为了防止不良数据的流入,我们正在转换所有上游数据存储,对数据内容执行强制性模式检查,并在数据出现任何问题时拒绝数据进入(比如未经模式的确认)。
我们发现的第二个问题是实际数据内容的质量。虽然使用模式可以确保数据包含正确的数据类型,但它们不会检查实际的数据值 (比如一个整数,但不是 0 到 150 之间的正数)。为了提高数据质量,我们正在扩展模式服务以支持语义检查。这些语义检查 (也就是指 Uber 特定数据类型) 使得我们能够在基本结构类型检查之外对实际数据内容添加额外的限制。
数据延迟
我们的目标是将 Hadoop 中的原始数据延迟减少到五分钟,将建模表的数据延迟减少到十分钟。这将使得更多用例从流式处理转向使用 Hudi 增量数据摄取的更有效的小批量处理。
同时,我们还在扩展我们的 Hudi 项目,以支持其他视图模式,其中包括现有的读取优化视图,以及显示数据延迟仅几分钟的实时视图。这个实时视图依赖于我们称之为 Merge-On-Read 或 Hudi 2.0 的开源解决方案(也是 Hudi 的一部分)。
数据效率
为了提高数据效率,我们不再依赖专用硬件来实现任何服务,转向了服务容器化。此外,我们统一了所有资源调度程序和 Hadoop 生态系统,在整个公司内部搭建 Hadoop 和非数据服务之间的桥梁。这样我们就可以统一调度所有作业和服务,不用管它将运行在什么样的介质中。随着 Uber 的发展,数据位置将成为 Hadoop 应用程序的一大关注点,一个成功的统一资源管理器可以将所有现有的调度程序集中在一起(比如 Yarn、Mesos 和 Myriad)。
可扩展性和可靠性
在努力改善平台可扩展性和可靠性的过程中,我们发现了与可能的边界情况相关的几个问题。虽然我们的摄取平台是基于通用的可插拔模型,但实际摄取上游数据仍然包括了许多依赖于源的管道配置,使得摄取管道并不稳定,并且增加了维护这数千个管道的成本。
为了确保我们可以在不用管数据来源的情况下统一数据摄取,我们与 Uber 的数据存储团队合作启动了一个项目,在不管技术构成的情况下,统一所有上游数据源的变更日志的内容、格式和元数据。该项目将确保有关这些特定上游技术的信息只是添加到实际变更日志值的附加元数据(而不是产生形成不同的变更日志内容或不同数据源的元数据),并且确保无论上游源是什么,都要进行数据摄取。
最后,下一版 Hudi 将帮助我们在几分钟内为所有数据源生成更大的 Parquet 文件(相比于我们当前的 128MB 来说,将超过 1GB)。它还将消除更新与插入的比值带来的任何敏感性。Hudi 1.0 依赖 copy-on-write 技术,只要有更新的记录,它就会重写整个源 Parquet 文件。这明显增大了写入量,特别是当更新与插入比率增加,阻止在 HDF 中创建更大的 Parquet 文件时。Hudi 的新版本旨在克服这个限制,将更新的记录存储在单独的增量文件中,并基于给定策略将其与基础 Parquet 文件异步合并。如果将 Hadoop 数据存储在较大的 Parquet 文件中以及更可靠的源独立数据摄取平台,我们的分析数据平台将在未来几年随着业务的发展而继续增长。
英文原文:
https://eng.uber.com/uber-big-data-platform/
时间:2018-10-31 09:03 来源: 转发量:次
声明:本站部分作品是由网友自主投稿和发布、编辑整理上传,对此类作品本站仅提供交流平台,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,不为其版权负责。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。
相关文章:
相关推荐:
网友评论: