apache

apache

基于服务的分布式事务(上)

微服务蜂巢 发表了文章 • 0 个评论 • 734 次浏览 • 2019-02-26 10:19 • 来自相关话题

传统数据库事务 在传统单体应用架构下,我们通常会将业务数据存储在一个数据库中,应用各模块直接对数据库进行操作业务数据。由数据库提供基于ACID的事务保证。 A是Atomic 原子性:事务作为整体来执行,要么 ...查看全部
传统数据库事务

在传统单体应用架构下,我们通常会将业务数据存储在一个数据库中,应用各模块直接对数据库进行操作业务数据。由数据库提供基于ACID的事务保证。

  • A是Atomic 原子性:事务作为整体来执行,要么全部执行,要么都不执行。
  • C是Consistency 一致性:事务应确保数据从一个一致的状态转变为另一个一致的状态。
  • I是 Isolation 隔离性:多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
  • D是Durability 持久性:已提交的事务修改数据会被持久保持。
例如一个电商的下单操作,就涉及到用户系统、库存系统、支付系统以及配送系统等一系列的协同操作。我们在执行下单操作的过程中,如果出现库存短缺,或者用户账户余额不足的情况,这个下单操作就会涉及到一系列的相关业务系统调用。如果这些子系统连接同一个数据库,我们可以通过数据库提供的事务原子性机制将库存数量校验以及用户余额校验的工作,和执行具体的下单业务操作组合成为一个数据库事务操作。通过数据库事务原子性来保证系统各个模块的调用要么都成功,要么都失败(取消)。 同时,由于数据库提供一致性和持久性保证,保证了如果事务执行成功并提交,本次业务操作的数据在立即生效的同时不会产生异议。 同时数据库提供了不同级别的数据锁机制保证应用多个线程同时读取或者更新数据的过程中不会相互影响,从而来保证业务操作的隔离性。 微服务的分布式事务 随着微服务架构的流行,很多大型的业务流程被拆分成为了多个功能单一的基础服务,大家会根据业务的诉求在这些基础服务之上编写一些组合调用服务来满足业务诉求。为了保证微服务能够独立开发部署运行,通常我们会采用一个微服务对应一个数据库的架构,将内部数据经微服务封装之后,以服务的方式对外暴露。这样以往基于数据库来实现的数据操作,就变成了多个对外提供服务的微服务系统的协同完成操作。因为单个微服务只知道自己的服务执行情况,为了保证分布事务的一致性,参与分布式事务的微服务通常会依托协调器完成相关的一致性协调操作。在十多年前分布式事务的实现方案有CORBA的 Object Transaction Service(OTS)、J2EE的 Java Transaction API 以及 Java Transaction Service。这些事务管理以及事务服务的技术都是建立在ACID事务的概念上的。协调器依托于底层的资源交互协议实现资源的占用以及提交的操作,通过两阶段提交的方式实现分布式事务的强一致操作。两阶段提交将分布式事务操作分为准备和提交两个阶段:系统在准备解决阶段完成资源操作, 如果准备阶段中出现问题,支持回滚操作,但是在提交阶段是不允许出错的。两阶段在保证事务原子性上做了很多工作,但是两阶段提交最大的问题是在分布式事务执行过程中, 所有参与事务的节点资源都是被锁定的,系统不允许其他节点访问锁定的资源,在这种执行下很难进一步提升系统的执行效率。如上所述,在ACID的事务执行过程中,为了保证事务的隔离性,通常我们会采用读写加锁的方式,通过串行处理数据方式,保证多个事务在同时执行的过程中不会相互影响。也就是说只有当事务提交并且保存修改记录或者回退取消修改记录之后,其他的事务才能继续执行。然而对于由多个事务组成的长时间运行的事务来说,如果在整个事务的执行过程都采用这种机制来保证事务的隔离性是一种很低效的解决方案。那我们有什么办法即提高系统运行效率,又能保证事务的数据一致性呢?答案是采用补偿的方式来解决这一问题。 基于补偿的事务实现 补偿是指我们将一个事务分成一个本地执行的正常操作事务和一个逻辑上对之前的操作进行补偿的事务。这样采用补偿事务的方式,我们可以把一个长时间运行的事务分化成若干个可以立即提交的本地事务调用,而不是一个长时间占用锁资源的巨型事务。 这样做的最大好处就是极大降低锁占用的时间。作为代价,补偿方式的取消操作和以往的实现方式有很大的不同,我们需要执行一个单独的ACID事务来完成对之前已提交的事务的逻辑补偿。下图展示了一个典型的分布式事务调用, 用户请求触发事务初始服务, 事务初始服务会顺序调用两个事务参与服务(服务A,服务B)。由于这两个事务参与服务之间没有联系,当事务参与服务执行出现了问题,需要一个协调器参与相关的恢复操作。
01.jpg
这里我们可以根据补偿执行的不同将其分成两组不同的补偿方式:不完美补偿 - 反向操作会留下之前原始事务操作的痕迹,一般来说我们是会在原始事务记录中设置取消状态。完美补偿 - 反向逻辑会彻底清理之前的原始事务操作,一般来说是不会保留原始事务交易记录,用户是感知不到事务取消之前的状态信息的。对于采用不完美的补偿方式的系统(Saga实现)来说:我们的补偿事务逻辑和其他的事务逻辑相比没有什么不同, 系统只需要像执行其他业务逻辑一样执行相关的补偿操作即可,无需设置特殊的处理逻辑来恢复事务执行之前的状态。以我们常见的银行ATM取款业务为例,银行账户预先进行扣减的操作,如果取款不成功,其逻辑恢复操作就是通过冲正的方式将预先扣减的款项打回到用户账户,我们可以通过查看账户的交易记录找到扣减和冲正的记录信息。下图展示的内容就是当初始服务调用分别调用服务A和服务B,服务B执行出现错误,这个时候我们事务协调器会调用服务A的冲正方法将系统状态恢复到执行服务调用之前的状态。对于采用完美补偿方式的系统(Try-Cancel/Confirm实现)来说:为了让系统能够在补偿操作彻底清除事务执行的情况,我们会借助两阶段提交协议来完成这部分的功能。在TCC方式下,cancel补偿显然是在第二阶段需要执行业务逻辑来取消第一阶段产生的后果。try是在第一阶段执行相关的业务操作,完成相关业务资源的占用,例如预先分配票务资源,或者检查并刷新用户的账户信用额度。在cancel阶段释放相关的业务资源,例如释放预先分配的票务资源或者恢复之前占用的用户信用额度。那我们为什么还要加入confirm操作呢?这需要从业务资源的使用生命周期来入手。在try过程中,我们只是占用的业务资源,相关的执行操作只是出于待定状态,只有在确认操作执行完毕之后,业务资源才能真正被确认。例如订票业务的try操作,我们只是占用了相关的票务资源。目的是防止票务资源被其他用户占用,但是业务还没有执行完毕,票务提供方还不能将被占用的票务资源统计为已售出票务。 只有相关票务资源被确认售出的之后,票务提供方才能将其统计为已售出票务资源。 ServiceComb Pack架构介绍 通过上面的分析我们可以发现一个有意思的现象,每一步事务的操作都有可能会根据业务的执行情况提供一个补偿操作,通过一个事务管理系统来协调这个补偿操作可以帮我们大大降低业务流程建模的复杂度。在分布式事务实现过程中, 协调器的作用非常重要, 各个事务的参与方需要跟协调器建立好良好的沟通, 由协调器统一调度完成相关事务的执行或者取消的操作。ServiceComb Pack架构如下图所示,主要包含两个组件,即Alpha和Omega,其中:
02.png
Alpha充当协调者的角色,主要负责对事务的事件进行持久化存储以及协调子事务的状态,使其最终得以与全局事务的状态保持一致,即保证事务中的子事务要么全执行,要么全不执行。Omega是微服务中内嵌的一个agent,负责对监控本地事务执行情况并向Alpha上报事务执行事件,并在异常情况下根据alpha下发的指令执行相应的补偿或重试操作。Omega可以通过向Alpha发送消息的方式向Alpha实时传递事务执行的进展,但是Alpha怎么知道这些Omega上传的消息是相互关联的呢?我们通过在服务调用过程中插入唯一的全局事务ID,并在后续的调用其它服务过程中传递这个全局事务ID。通过全局事务ID可以从汇总到Alpha事件中找到事件与之相关联的所有事件,通过对这些事件信息进行分析,我们可以完整地追踪到与分布式事务执行情况。
03.jpg
Omega会以切面编程的方式向应用程序注入相关的处理模块,帮助我们构建分布式事务调用的上下文。Omega在事务处理初始阶段处理事务的相关准备的操作,并在事务执行完毕后做一些清理的操作,例如创建分布式事务起始事件,以及相关的子事件,根据事务执行的成功或者失败生成相关的事务终止或者失败事件。这样带来的好处是用户的代码只需要添加几个annotation来描述分布式事务执行范围,以及与本地的事务处理恢复的相关函数信息,Omega就能通过切面注入的代码追踪本地事务的执行情况。Omega会将本地事务执行的情况以事件的方式通知给Alpha。由于单个Omega不可能知晓一个分布式事务下其他参与服务的执行情况,如此一来,就需要Alpha扮演一个十分重要的协调者的角色。Alpha将收集到的分布式事务事件信息整理汇总,通过分析这些事件之间的关系可以了解到分布式事务的执行情况。Alpha通过向Omega下发相关的执行指令由Omega执行相关提交或恢复操作,实现分布式事务的最终一致性。
04.jpg
在了解过Pack实现的部分细节之后, 我们可以从下图进一步了解ServiceComb Pack架构下,Alpha与Omega内部各模块之间的关系图。
05.jpg
整个架构分为三个部分:[list=1]
  • 一个是Alpha协调器(支持多个实例提供高可用支持)。
  • 二是注入到微服务实例中的Omega。
  • 三是Alpha与Omega之间的交互协议。
  • 目前ServiceComb Pack支持Saga 以及TCC两种分布式事务协调协议实现。Omega包含了与分析用户分布式事务逻辑相关的模块:
    • 事务注解模块(Transaction Annotation)
    • 事务拦截器(Transaction Interceptor)
    • 分布式事务执行相关的事务上下文(Transaction Context)
    • 事务回调(Transaction Callback)
    • 事务执行器(Transaction Executor)
    • 以及负责与Alpha进行通讯的事务传输(Transaction Transport)模块。


    事务注解模块是分布式事务的用户界面,用户将这些标注添加到自己的业务代码之上用以描述与分布式事务相关的信息,这样Omega就可以按照分布式事务的协调要求进行相关的处理。如果大家扩展自己的分布式事务,也可以通过定义自己的事务标注来实现。



    事务拦截器这个模块我们可以借助AOP手段,在用户标注的代码基础上添加相关的拦截代码,获取到与分布式事务以及本地事务执行相关的信息,并借助事务传输模块与Alpha进行通讯传递事件。

    事务上下文为Omega内部提供了一个传递事务调用信息的一个手段,借助前面提到的全局事务ID以及本地事务ID的对应关系,Alpha可以很容易检索到与一个分布式事务相关的所有本地事务事件信息。



    事务执行器主要是为了处理事务调用超时设计的模块。由于Alpha与Omega之间的连接有可能不可靠,Alpha端很难判断Omega本地事务执行超时是由Alpha与Omega直接的网络引起的还是Omega自身调用的问题,因此设计了事务执行器来监控Omega的本地的执行情况,简化Omega的超时操作。目前Omega的缺省实现是直接调用事务方法,由Alpha的后台服务通过扫描事件表的方式来确定事务执行时间是否超时。


    事务回调在Omega与Alpha建立连接的时候就会向Alpha进行注册,当Alpha需要进行相关的协调操作的时候,会直接调用Omega注册的回调方法进行通信。 由于微服务实例在云化场景启停会很频繁,我们不能假设Alpha一直能找到原有注册上的事务回调, 因此我们建议微服务实例是无状态的,这样Alpha只需要根据服务名就能找到对应的Omega进行通信。


    事务传输模块负责Omega与Alpha之间的通讯,在具体的实现过程中,Pack通过定义相关的Grpc描述接口文件定义了TCC 以及Saga的事务交互方法, 同时也定义了与交互相关的事件。我们借助了Grpc所提供的双向流操作接口实现了Omega与Alpha之间的相互调用。 Omega和Alpha的传输建立在Grpc多语言支持的基础上,为实现多语言版本的Omega奠定了基础。


    Alpha为了实现其事务协调的功能,首先需要通过事务传输(Transaction Transport)接收Omega上传的事件, 并将事件存在事件存储(Event Store)模块中,Alpha通过事件API(Event API)对外提供事件查询服务。Alpha会通过事件扫描器(Event Scanner)对分布式事务的执行事件信息进行扫描分析,识别超时的事务,并向Omega发送相关的指令来完成事务协调的工作。由于Alpha协调是采用多个实例的方式对外提供高可用架构, 这就需要Alpha集群管理器(Alpha Cluster Manger)来管理Alpha集群实例之前的协调。用户可以通过管理终端(Manage console)对分布式事务的执行情况进行监控。



    目前Alpha的事件存储是构建在数据库基础之上的。为了降低系统实现的复杂程度,Alpha集群的高可用架构是建立在数据库集群基础之上的。 为了提高数据库的查询效率,我们会根据全局事务的执行情况将数据存储分成了在线库以及存档库,将未完成的分布式事务事件存储在在线库中, 将已经完成的分布式事务事件存储在存档库中。



    事件API是Alpha对外暴露的Restful事件查询服务。 该模块功能首先应用在Pack的验收测试中,通过事件API验收测试代码可以很方便的了解Alpha内部接收的事件。验收测试通过模拟各种分布式事务执行异常情况(错误或者超时),比对Alpha接收到的事务事件来验证相关的其他事务协调功能是否正确。



    管理终端是一个js的前端界面, 管理终端通过访问事件API提供的Rest服务,向用户提供分布式事务执行情况的统计分析,并且可以追踪单个全局事务的执行情况,找出事务的失败的根源。在Pack 0.3.0 中实现了一部分功能,后续还需要进一步完善,欢迎大家参与进来。



    Alpha集群管理器负责Alpha实例注册工作,管理Alpha中单个服务的执行情况, 并且为Omega提供一个及时更新的服务列表。 通过集群管理器用户可以轻松实现Alpha服务实例的启停操作,以及Alpha服务实例的滚动升级功能。目前这部分的模块还在设计开发中,欢迎对此有兴趣的朋友加入到我们的开发队伍中来。


    小结

    本文从分布式事务需要解决的问题入手,向大家介绍了建立在补充基础之上的基于服务的分布式事务的解决思路。接下来我们结合具体的示例介绍了完美的补偿(TCC)和非完美补偿(Saga)两种分布式事务协调协议,最后结合ServiceComb Pack的实现原理详细介绍了ServiceComb Pack的架构实现。



    在基于服务的分布式事务下篇中,我们将结合具体的示例向大家介绍TCC以及Saga分布式事务协调协议的交互细节,以及如何使用ServiceComb Pack编写TCC 以及Saga 应用。

    DataPipeline |《Apache Kafka实战》作者胡夕:Apache Kafka监控与调优

    数见科技 发表了文章 • 0 个评论 • 1230 次浏览 • 2018-09-04 18:36 • 来自相关话题

    胡夕,《Apache Kafka实战》作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM、搜狗、微博等公司。国内活跃的Kafka代码贡献者。 前言 虽然目前Apache Kafka已经全面进 ...查看全部
    胡夕,《Apache Kafka实战》作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM、搜狗、微博等公司。国内活跃的Kafka代码贡献者。

    前言

    虽然目前Apache Kafka已经全面进化成一个流处理平台,但大多数的用户依然使用的是其核心功能:消息队列。对于如何有效地监控和调优Kafka是一个大话题,很多用户都有这样的困扰,今天我们就来讨论一下。

    一、Kafka综述

    在讨论具体的监控与调优之前,我想用一张PPT图来简单说明一下当前Kafka生态系统的各个组件。就像我前面所说,Kafka目前已经进化成了一个流处理平台,除了核心的消息队列组件Kafka core之外,社区还新引入Kafka Connect和Kafka Streams两个新的组件:其中前者负责Kafka与外部系统的数据传输;后者则负责对数据进行实时流处理计算。下图罗列了一些关键的Kafka概念。


    幻灯片2.jpg



    二、Kafka监控

    我打算从五个维度来讨论Kafka的监控。首先是要监控Kafka集群所在的主机;第二是监控Kafka broker JVM的表现;第三点,我们要监控Kafka Broker的性能;第四,我们要监控Kafka客户端的性能。这里的所指的是广义的客户端——可能是指我们自己编写的生产者、消费者,也有可能是社区帮我们提供的生产者、消费者,比如说Connect的Sink/Source或Streams等;最后我们需要监控服务器之间的交互行为。


    幻灯片3.jpg



    1.主机监控

    个人认为对于主机的监控是最重要的。因为很多线上环境问题首先表现出来的症状就是主机的某些性能出现了明显的问题。此时通常是运维人员首先发现了它们然后告诉我们这台机器有什么问题,对于Kafka主机监控通常是发现问题的第一步。这一页列出了常见的指标,包括CPU、内存、带宽等数据。需要注意的是CPU使用率的统计。可能大家听过这样的提法:我的Kafka Broker CPU使用率是400%,怎么回事?对于这样的问题,我们首先要搞清楚这个使用率是怎么观测出来的? 很多人拿top命令中的vss或rss字段来表征CPU使用率,但实际上它们并不是真正的CPU使用率——那只是所有CPU共同作用于Kafka进程所花的时间片的比例。举个例子,如果机器上有16个CPU,那么只要这些值没有超过或接近1600, 那么你的CPU使用率实际上是很低的。因此要正确理解这些命令中各个字段的含义。

    这页PPT右边给出了一本书,如果大家想监控主机性能的话,我个人建议这本《SystemsPerformance》就足够了。非常权威的一本书,推荐大家读一下。


    幻灯片4.jpg



    2.监控JVM

    Kafka本身是一个普通的Java进程,所以任何适用于JVM监控的方法对于监控Kafka都是相通的。第一步就是要先了解Kafka应用。比方说了解Kafka broker JVM的GC频率和延时都是多少,每次GC后存活对象的大小是怎样的等。了解了这些信息我们才能明确后面调优的方向。当然,我们毕竟不是特别资深的JVM专家,因此也不必过多追求繁复的JVM监控与调优。只需要关注大的方面即可。另外,如果大家时间很有限但又想快速掌握JVM监控与调优,推荐阅读《Java Performance》。

    3.Per-Broker监控

    首先要确保Broker进程是启动状态?这听起来好像有点搞笑,但我的确遇到过这样的情况。比如当把Kafka部署在Docker上时就容易出现进程启动但服务没有成功启动的情形。正常启动下,一个Kafka服务器起来的时候,应该有两个端口,一个端口是9092常规端口,会建一个TCP链接。还有一个端口是给JMX监控用的。当然有多台broker的话,那么controller机器会为每台broker都维护一个TCP连接。在实际监控时可以有意识地验证这一点。

    对于Broker的监控,我们主要是通过JMS指标来做的。用过Kafka的人知道,Kafka社区提供了特别多的JMS指标,其中很多指标用处不大。我这里列了一些比较重要的:首先是broker机器每秒出入的字节数,就是类似于我可以监控网卡的流量,一定要把这个指标监控起来,并实时与你的网卡带宽进行比较——如果发现该值非常接近于带宽的话,就证明broker负载过高,要么增加新的broker机器,要么把该broker上的负载均衡到其他机器上。

    另外还有两个线程池空闲使用率小关注,最好确保它们的值都不要低于30%,否则说明Broker已经非常的繁忙。 此时需要调整线程池线程数。

    接下来是监控broker服务器的日志。日志中包含了非常丰富的信息。这里所说的日志不仅是broker服务器的日志,还包括Kafka controller的日志。我们需要经常性地查看日志中是否出现了OOM错误抑或是时刻关注日志中抛出的ERROR信息。

    我们还需要监控一些关键后台线程的运行状态。个人认为有两个比较重要的线程需要监控:一个Log Cleaner线程——该线程是执行数据压实操作的,如果该线程出问题了,用户通常无法感知到,然后会发现所有compact策略的topic会越来越大直到占满所有磁盘空间;另一个线程就是副本拉取线程,即follower broker使用该线程实时从leader处拉取数据。如果该线程“挂掉”了,用户通常也是不知道的,但会发现follower不再拉取数据了。因此我们一定要定期地查看这两个线程的状态,如果发现它们意味终止,则去找日志中寻找对应的报错信息。


    幻灯片6.jpg



    4.Clients监控

    客户端监控这块,我这边会分为两个,分别讨论对生产者和消费者的监控。生产者往Kafka发消息,在监控之前我们至少要了解一下客户端机器与Broker端机器之间的RTT是多少。对于那种跨数据中心或者是异地的情况来说,RTT本来就很大,如果不做特殊的调优,是不可能有太高的TPS的。目前Kafka producer是双线程的设计机制,分为用户主线程和Sender线程,当这个Sender线程挂了的时候,前端用户是不感知的,但表现为producer发送消息失败,所以用户最好监控一下这个Sender线程的状态。


    幻灯片7.jpg



    还有就是监控PRODUCE请求的处理延时。一条消息从生产者端发送到Kafka broker进行处理,之后返回给producer的总时间。整个链路中各个环节的耗时最好要做到心中有数。因为很多情况下,如果你要提升生产者的TPS,了解整个链路中的瓶颈后才能做到有的放矢。后面PPT中我会讨论如何拆解这条链路。

    现在说说消费者。这里的消费者说的是新版本的消费者,也就是java consumer。


    幻灯片8.jpg



    社区已经非常不推荐再继续使用老版本的消费者了。新版本的消费者也是双线程设计,后面有一个心跳线程,如果这个线程挂掉的话,前台线程是不知情的。所以,用户最好定期监控该心跳线程的存活情况。心跳线程定期发心跳请求给Kafka服务器,告诉Kafka,这个消费者实例还活着,以避免coordinator错误地认为此实例已“死掉”从而开启rebalance。Kafka提供了很多的JMX指标可以用于监控消费者,最重要的消费进度滞后监控,也就是所谓的consumerlag。

    假设producer生产了100条消息,消费者读取了80条,那么lag就是20。显然落后的越少越好,这表明消费者非常及时,用户也可以用工具行命令来查lag,甚至写Java的API来查。与lag对应的还有一个lead指标,它表征的是消费者领先第一条消息的进度。比如最早的消费位移是1,如果消费者当前消费的消息是10,那么lead就是9。对于lead而言越大越好,否则表明此消费者可能处于停顿状态或者消费的非常慢,本质上lead和lag是一回事,之所以列出来是因为lead指标是我开发的,也算打个广告吧。

    除了以上这些,我们还需要监控消费者组的分区分配情况,避免出现某个实例被分配了过多的分区,导致负载严重不平衡的情况出现。一般来说,如果组内所有消费者订阅的是相同的主题,那么通常不会出现明显的分配倾斜。一旦各个实例订阅的主题不相同且每个主题分区数参差不齐时就极易发生这种不平衡的情况。Kafka目前提供了3种策略来帮助用户完成分区分配,最新的策略是黏性分配策略,它能保证绝对的公平,大家可以去试一下。

    最后就是要监控rebalance的时间——目前来看,组内超多实例的rebalance性能很差,可能都是小时级别的。而且比较悲剧的是当前无较好的解决方案。所以,如果你的Consumer特别特别多的话,一定会有这个问题,你监控一下两个步骤所用的时间,看看是否满足需求,如果不能满足的话,看看能不能把消费者去除,尽量减少消费者数量。

    5.Inter-Broker监控

    最后一个维度就是监控Broker之间的表现,主要是指副本拉取。Follower副本实时拉取leader处的数据,我们自然希望这个拉取过程越快越好。Kafka提供了一个特别重要的JMX指标,叫做备份不足的分区数,比如说我规定了这条消息,应该在三个Broker上面保存,假设只有一个或者两个Broker上保存该消息,那么这条消息所在的分区就被称为“备份不足”的分区。这种情况是特别关注的,因为有可能造成数据的丢失。《Kafka权威指南》一书中是这样说的:如果你只能监控一个Kafka JMX指标,那么就监控这个好了,确保在你的Kafka集群中该值是永远是0。一旦出现大于0的情形赶紧处理。


    幻灯片9.jpg



    还有一个比较重要的指标是表征controller个数的。整个集群中应该确保只能有一台机器的指标是1,其他全应该是0,如果你发现有一台机器是2或者是3,一定是出现脑裂了,此时应该去检查下是否出现了网络分区。Kafka本身是不能对抗脑裂的,完全依靠Zookeeper来做,但是如果真正出现网络分区的话,也是没有办法处理的,不如赶快fail fast掉。

    三、监控工具

    当前没有一款Kafka监控工具是公认比较优秀的,每个都有自己的特点但也有些致命的缺陷。我们针对一些常见的监控工具逐个讨论下。

    1.Kafka Manager

    应该说在所有免费的监控框架中,Kafka Manager是最受欢迎的。它最早由雅虎开源,功能非常齐全,展示的数据非常丰富。另外,用户能够在界面上执行一些简单的集群管理操作。更加令人欣慰的是,该框架目前还在不断维护中,因此使用Kafka manager来监控Kafka是一个不错的选择。

    2.Burrow

    Burrow是去年下半年开源,专门监控消费者信息的框架。这个框架刚开始开源的时候,我还对它还是寄予厚望的,毕竟是Kafka社区committer亲自编写的。不过Burrow的问题在于没有UI界面,不方便运维操作。另外由于是Go语言写的,你要用的话,必须搭建Go语言环境,然后编译部署,总之用起来不是很方便。还有就是它的更新不是很频繁,已经有点半荒废的状态,大家不妨一试。

    3.Kafka Monitor

    严格来说,它不是监控工具,它是专门做Kafka集群系统性测试用的。待监控的指标可以由用户自己设定,主要是做一些端到端的测试。比如说你搭了一套Kafka集群,想测试端到端的性能怎样:从发消息到消费者读取消息这一整体流程的性能。该框架的优势也是由Kafka社区团队写的,质量有保障,但更新不是很频繁,目前好像几个月没有更新了。

    4.Kafka Offset Monitor

    KafkaOffsetMonitor是我用的最早的一个Kafka监控工具,也是监控消费者位移,只不过那时候Kafka把位移保持在Zookeepr上。这个框架的界面非常漂亮,国内用的人很多。但是现在有一个问题,因为我们现在用了新版本的消费者,这个框架目前支持得的并不是特别好。而且还有一个问题就是它已经不再维护了,可能有1-2年没有任何更新了。

    5.Kafka Eagle

    这是国人自己开发的,我不知道具体是哪个大牛开发的,但是在Kafka QQ群里面很多人推崇,因为界面很干净漂亮,上面有很好的数据展现。

    6.Confluent Control Center

    Control Center是目前我能收集到的功能最齐全的Kafka监控框架了,只不过只有购买了Confluent企业版也有的,也就是说是付费的。

    综合来讲,如果你是Kafka集群运维操作人员,推荐先用Kafka Manager来做监控,后面再根据实际监控需求定制化开发特有的工具或框架。

    四、系统调优

    Kafka监控的一个主要的目的就是调优Kafka集群。这里罗列了一些常见的操作系统级的调优。

    首先是保证页缓存的大小——至少要设置页缓存为一个日志段的大小。我们知道Kafka大量使用页缓存,只要保证页缓存足够大,那么消费者读取消息时就有大概率保证它能够直接命中页缓存中的数据而无需从底层磁盘中读取。故只要保证页缓存要满足一个日志段的大小。

    第二是调优文件打开数。很多人对这个资源有点畏手畏脚。实际上这是一个很廉价的资源,设置一个比较大的初始值通常都是没有什么问题的。

    第三是调优vm.max_map_count参数。主要适用于Kafka broker上的主题数超多的情况。Kafka日志段的索引文件是用映射文件的机制来做的,故如果有超多日志段的话,这种索引文件数必然是很多的,极易打爆这个资源限制,所以对于这种情况一般要适当调大这个参数。

    第四是swap的设置。很多文章说把这个值设为0,就是完全禁止swap,我个人不建议这样,因为当你设置成为0的时候,一旦你的内存耗尽了,Linux会自动开启OOM killer然后随机找一个进程杀掉。这并不是我们希望的处理结果。相反,我建议设置该值为一个比较接近零的较小值,这样当我的内存快要耗尽的时候会尝试开启一小部分swap,虽然会导致broker变得非常慢,但至少给了用户发现问题并处理之的机会。

    第五JVM堆大小。首先鉴于目前Kafka新版本已经不支持Java7了,而Java 8本身不更新了,甚至Java9其实都不做了,直接做Java10了,所以我建议Kafka至少搭配Java8来搭建。至于堆的大小,个人认为6-10G足矣。如果出现了堆溢出,就提jira给社区,让他们看到底是怎样的问题。因为这种情况下即使用户调大heap size,也只是延缓OOM而已,不太可能从根本上解决问题。


    幻灯片18.jpg



    最后,建议使用专属的多块磁盘来搭建Kafka集群。自1.1版本起Kafka正式支持JBOD,因此没必要在底层再使用一套RAID了。

    五、Kafka调优的四个层面

    Kafka调优通常可以从4个维度展开,分别是吞吐量、延迟、持久性和可用性。在具体展开这些方面之前,我想先建议用户保证客户端与服务器端版本一致。如果版本不一致,就会出现向下转化的问题。举个例子,服务器端保存高版本的消息,当低版本消费者请求数据时,服务器端就要做转化,先把高版本消息转成低版本再发送给消费者。这件事情本身就非常非常低效。很多文章都讨论过Kafka速度快的原因,其中就谈到了零拷贝技术——即数据不需要在页缓存和堆缓存中来回拷贝。

    简单来说producer把生产的消息放到页缓存上,如果两边版本一致,可以直接把此消息推给Consumer,或者Consumer直接拉取,这个过程是不需要把消息再放到堆缓存。但是你要做向下转化或者版本不一致的话,就要额外把数据再堆上,然后再放回到Consumer上,速度特别慢。

    1.Kafka调优 – 吞吐量

    调优吞吐量就是我们想用更短的时间做更多的事情。这里列出了客户端需要调整的参数。前面说过了producer是把消息放在缓存区,后端Sender线程从缓存区拿出来发到broker。这里面涉及到一个打包的过程,它是批处理的操作,不是一条一条发送的。因此这个包的大小就和TPS息息相关。通常情况下调大这个值都会让TPS提升,但是也不会无限制的增加。不过调高此值的劣处在于消息延迟的增加。除了调整batch.size,设置压缩也可以提升TPS,它能够减少网络传输IO。当前Lz4的压缩效果是最好的,如果客户端机器CPU资源很充足那么建议开启压缩。


    幻灯片21.jpg



    对于消费者端而言,调优TPS并没有太好的办法,能够想到的就是调整fetch.min.bytes。适当地增加该参数的值能够提升consumer端的TPS。对于Broker端而言,通常的瓶颈在于副本拉取消息时间过长,因此可以适当地增加num.replica.fetcher值,利用多个线程同时拉取数据,可以加快这一进程。

    2.Kafka调优 – 延时

    所谓的延时就是指消息被处理的时间。某些情况下我们自然是希望越快越好。针对这方面的调优,consumer端能做的不多,简单保持fetch.min.bytes默认值即可,这样可以保证consumer能够立即返回读取到的数据。讲到这里,可能有人会有这样的疑问:TPS和延时不是一回事吗?假设发一条消息延时是2ms,TPS自然就是500了,因为一秒只能发500消息,其实这两者关系并不是简单的。因为我发一条消息2毫秒,但是如果把消息缓存起来统一发,TPS会提升很多。假设发一条消息依然是2ms,但是我先等8毫秒,在这8毫秒之内可能能收集到一万条消息,然后我再发。相当于你在10毫秒内发了一万条消息,大家可以算一下TPS是多少。事实上,Kafka producer在设计上就是这样的实现原理。

    3.Kafka调优 –消息持久性

    消息持久化本质上就是消息不丢失。Kafka对消息不丢失的承诺是有条件的。以前碰到很多人说我给Kafka发消息,发送失败,消息丢失了,怎么办?严格来说Kafka不认为这种情况属于消息丢失,因为此时消息没有放到Kafka里面。Kafka只对已经提交的消息做有条件的不丢失保障。

    如果要调优持久性,对于producer而言,首先要设置重试以防止因为网络出现瞬时抖动造成消息发送失败。一旦开启了重试,还需要防止乱序的问题。比如说我发送消息1与2,消息2发送成功,消息1发送失败重试,这样消息1就在消息2之后进入Kafka,也就是造成乱序了。如果用户不允许出现这样的情况,那么还需要显式地设置max.in.flight.requests.per.connection为1。


    幻灯片24.jpg



    本页PPT列出的其他参数都是很常规的参数,比如unclean.leader.election.enable参数,最好还是将其设置成false,即不允许“脏”副本被选举为leader。

    4.Kafka调优 –可用性

    最后是可用性,与刚才的持久性是相反的,我允许消息丢失,只要保证系统高可用性即可。因此我需要把consumer心跳超时设置为一个比较小的值,如果给定时间内消费者没有处理完消息,该实例可能就被踢出消费者组。我想要其他消费者更快地知道这个决定,因此调小这个参数的值。

    六、定位性能瓶颈

    下面就是性能瓶颈,严格来说这不是调优,这是解决性能问题。对于生产者来说,如果要定位发送消息的瓶颈很慢,我们需要拆解发送过程中的各个步骤。就像这张图表示的那样,消息的发送共有6步。第一步就是生产者把消息放到Broker,第二、三步就是Broker把消息拿到之后,写到本地磁盘上,第四步是follower broker从Leader拉取消息,第五步是创建response;第六步是发送回去,告诉我已经处理完了。


    幻灯片26.jpg



    这六步当中你需要确定瓶颈在哪?怎么确定?——通过不同的JMX指标。比如说步骤1是慢的,可能你经常碰到超时,你如果在日志里面经常碰到request timeout,就表示1是很慢的,此时要适当增加超时的时间。如果2、3慢的情况下,则可能体现在磁盘IO非常高,导致往磁盘上写数据非常慢。倘若是步骤4慢的话,查看名为remote-time的JMX指标,此时可以增加fetcher线程的数量。如果5慢的话,表现为response在队列导致待的时间过长,这时可以增加网络线程池的大小。6与1是一样的,如果你发现1、6经常出问题的话,查一下你的网络。所以,就这样来分解整个的耗时。这是到底哪一步的瓶颈在哪,需要看看什么样的指标,做怎样的调优。

    七、Java Consumer的调优

    最后说一下Consumer的调优。目前消费者有两种使用方式,一种是同一个线程里面就直接处理,另一种是我采用单独的线程,consumer线程只是做获取消息,消息真正的处理逻辑放到单独的线程池中做。这两种方式有不同的使用场景:第一种方法实现较简单,因为你的消息处理逻辑直接写在一个线程里面就可以了,但是它的缺陷在于TPS可能不会很高,特别是当你的客户端的机器非常强的时候,你用单线程处理的时候是很慢的,因为你没有充分利用线程上的CPU资源。第二种方法的优势是能够充分利用底层服务器的硬件资源,TPS可以做的很高,但是处理提交位移将会很难。

    最后说一下参数,也是网上问的最多的,这几个参数到底是做什么的。第一个参数,就是控制consumer单次处理消息的最大时间。比如说设定的是600s,那么consumer给你10分钟来处理。如果10分钟内consumer无法处理完成,那么coordinator就会认为此consumer已死,从而开启rebalance。

    Coordinator是用来管理消费者组的协调者,协调者如何在有效的时间内,把消费者实例挂掉的消息传递给其他消费者,就靠心跳请求,因此可以设置heartbeat.interval.ms为一个较小的值,比如5s。

    八、Q & A

    Q1:胡老师在前面提到低版本与高版本有一个端口的问题,我想问一下高版本的、低版本的会有这个问题吗?

    A1:会有。

    Q2:两种模式,一个是Consumer怎么做到所有的partition,在里面做管理的。会有一个问题,某个Consumer的消费比较慢,因为所有的Partition的消费都是绑定在一个线程。一个消费比较慢,一个消费比较快,要等另一个。有没有一种方案,消费者比较慢的可以暂定,如果涉及到暂停的话,频繁的暂定耗费的时间,是不是会比较慢?

    A2:一个线程处理所有的分区。如果从开销来讲并不大,但是的确会出现像你说的,如果一个消费者定了100个分区,目前我这边看到的效果,某段时间内有可能会造成某些分区的饿死,比如说某些分区长期得不到数据,可能有一些分区不停的有数据,这种情况下的确有可能情况。但是你说的两种方法本身开销不是很大,因为它就是内存当中的结构变更,就是定位信息,如果segment,就把定位信息先暂时关掉,不涉及到很复杂的数据结构的变更。

    Q3:怎么决定顺序呢?

    A3:这个事情现在在Broker端做的,简单会做轮询,比如说有100个分区,第一批随机给你一批分区,之后这些分区会排到整个队列的末尾,从其他的分区开始给你,做到尽量的公平。

    Q4:消费的时候会出现数据倾斜的情况,这块如何理解?

    A4:数据倾斜。这种情况下发生在每个消费者订阅信息不一样的情况下,特别容易出现数据倾斜。比如说我订阅主题123,我订阅主题456,我们又在同一个组里面这些主题分区数极不相同,很有可能出现我订阅了10个分区,你可能订阅2个分区。如果你用的是有粘性的分配策略,那种保证不会出现超过两个以上相差的情况。这个策略推出的时间也不算短了,是0.11版本推出来的。

    点击这里,免费申请DataPipeline产品试用

    谈谈Apache Mesos和Mesosphere DCOS:历史、架构、发展和应用

    colstuwjx 发表了文章 • 1 个评论 • 15886 次浏览 • 2015-09-20 07:59 • 来自相关话题

    【编者的话】Mesos 是一个很年轻的开源项目,它的理念是怎样的? 它的整体架构以及服务对象又是什么? 基于此的 Mesosphere DCOS 又是如何定位的? 本文作者就这些话题展开了探讨。 ## Mesos 发展史 Mesos 是 ...查看全部
    【编者的话】Mesos 是一个很年轻的开源项目,它的理念是怎样的? 它的整体架构以及服务对象又是什么? 基于此的 Mesosphere DCOS 又是如何定位的? 本文作者就这些话题展开了探讨。
    ## Mesos 发展史
    Mesos 是一个早在2009年由 Benjamin Hindman、Andy Konwinski、Matei Zaharia、Ali Ghodsi、Anthony D. Joseph、Randy Katz、Scott Shenker和Ion Stoica几人联合发起的伯克利大学研究项目。Benjamin 随后将其引入 Twitter,而如今它已经完美的运行在他们的数据中心上, Benjamin 本人也在不久之后成为了 Mesosphere 的首席架构师,正是它构建了 Mesosphere 数据中心操作系统(DCOS)。

    Mesos 的设计宗旨在于尝试和提高集群的利用效率和性能,他们认为对于数据中心资源的单纯静态划分和使用的这样一个方式是值得重新考量的,举个例子来说:

    我们假设你的数据中心里拥有9个主机:
    1.png

    如果把它静态的划分开来,并且指定每三个主机承载一个应用,这样一来总共是3个应用(这里是Hadoop、Spark 和 Ruby on Rails)。
    2.png

    显而易见的一个问题是这些主机的资源利用率并不会很高;
    3.png

    因此如果你想使用全部的资源,即这里例子中的全部9台主机,那么就需要将其抽象成一个共享资源池,而你可以按需计划配置,这样的话,利用率自然可以得到相应的提升;
    4.png


    Mesos团队的第二个观点在于他们觉得需要为分布式系统量身定制一套新的系统,换句话说,他们觉得MapReduce并不是适用于所有的场景(这也导致了Spark的诞生,而它又是另外一个故事了),而我们需要一个新的更简单和更具有通用性的专为分布式系统提供服务的这样一个框架。
    ## Mesos 框架(分布式系统)到底是什么?
    一般来说,一个分布式系统你需要有一个Coordinator(调度器)和 多个Worker(执行任务)。调度器以同步(分布式)的方式运行进程/任务,处理程序错误(容错),并且负责优化性能(即弹性伸缩)。换句话说,它负责协调在数据中心去实际执行你想要运行的代码(不需要是一个完整的程序,它也可以是某些种类的运算)。正如之前所提到的那样,Mesos将其称之为联合调度。
    5.png

    或者也可以这么说,Mesos是一个带有调度器的分布式系统。
    6.png

    那么Mesos的真正定位是什么呢? 当你尝试去执行它的任务时你可以理解为它实际上就是机器和调度器之间的一层抽象。

    因此在Mesos里,调度器是和Mesos层(通过API等)通信,而不是直接跟物理机器打交道。Mesos这里通过这样的方式尝试解决的即是资源的静态划分问题,这意味着你不再需要针对每个特定的运行时分配一个对应的调度器去决定实际去执行它的workers,而取而代之的是,你有一个调度器去和Mesos通信,而它会反过来依据整个资源池的剩余资源做调度。
    7.png

    这样做带来的最显而易见的好处就是你可以在一批机器上运行多个不同的分布式系统并且更有效的(不再是静态划分)动态划分和共享这些资源。
    8.png

    其次,之所以这样抽象设计的另外一个重要原因在于它能够提供一个通用功能集(故障检测、分布式任务、任务启动、任务监控、结束任务、清理任务等),这样一来就无需每个分布式系统都各自重复的去实现这样一套逻辑。
    ## Mesos 适合作为数据中心的哪一层的抽象?
    Mesos 这一层抽象实现的目的即是想要尝试通过使用并更好的调度资源使得运行在其之上的这些框架变得更加易于构建和运行。
    9.png

    IaaS的抽象的是机器,例如你给它指定一个数字,它便会生成一堆的机器而这也可以看作是Mesos概念模型更底层化的一个抽象。PaaS则考虑的更多是部署和管理应用/服务,它并不关心底层的那些基础架构,而你可以把它看作是Mesos概念模型的一个更高层面的抽象。在交互方面,PaaS可能是和开发者直接交互,而Mesos则是以API的形式和软件程序交互。

    换句话说,你可以基于Mesos之上构建一个Paas系统(例如像Marathon - 它好像任何地方都比一个真正的Paas系统更像PaaS),同时你可以在一个IaaS上运行Mesos(例如OpenStack)。

    如果你将你的Mesos运行在一个组合系统(例如就像Openstack + 物理硬件 + 虚拟机)之上,那么你可以很直观的再次体会到动态划分资源的好处,那便是你能够跨越这些底层组件而直接的去管理和计划你的工作负载,某种意义上来说,你可以认为Mesos类似于是一个数据中心的内核,即它负责将物理机器抽象成资源,从而使得你能够忽略底层组件的存在,通过消费Mesos的抽象资源来构建分布式系统。

    因此我们可以说,Apache Mesos是为构建和运行其它分布式系统(例如像Spark)提供服务的分布式系统。
    ## Mesos架构内幕
    在 Mesos 里,一个框架程序(或者说分布式系统)发起的一次请求会在被接收到的那个时刻由调度器承接和分配。这跟传统分布式系统一般人为发起请求的方式不太一样(再强调一下,Mesos将会让框架程序发起请求,而不是人工操作),传统的方式即需要在人为发起请求时设定好需要分配的特定资源,然后再去真正请求和获取这些资源,这类情况中最典型的莫过于需求场景的变换(设想在Map/Reduce的场景下,比如在Map和Reduce阶段切换之际产生的一个需求资源的变化)
    10.png

    与传统分布式系统不一样的是,Mesos 将会立马为其分配所能分配的最大资源,而不是傻傻的在那等到满足该请求的资源完成/完全到位(在这里它想要实现的便是在绝大多数情况下十分奏效的无阻塞式资源分配策略,即你无须立马消费预期请求的全量资源的这样的情景)。

    当然,现在框架类应用(分布式系统)也可以使用Mesos提供的资源完成他们自己的调度,这便是所谓的 “二次资源调度”。
    11.png

    最终达到的效果即是你下发的一个任务可以在整个数据中心的任意一个地方提交并且运行。

    构建这样的“二次资源调度”系统的原因在于它可以在同一时间内支持多个分布式系统。同样以上面的例子来解释,Mesos为Spark提供和分配所需的资源。而这里,Spark则负责决策和分配这些可用资源去运行实际任务(即因为可用的资源得以满足需求,所以我才能够实际去运行这些map任务)。
    12.png

    所以一旦一个任务被框架应用提交到Mesos,那么这些任务就必须被实际执行。Mesos master 负责指派任务给每个slave,而每个slave通过上面跑着的agent来管理和运行这些任务。(这即是说如果这个任务是对应的一个命令,那么它会去执行它,如果它需要一些特定的资源来完成这个任务,比如像jar包,那么它会先获取所需的资源,然后在一个沙盒里执行它,最后才发起这个任务)

    或者说你也可以这样,框架应用可以通过一个执行器(框架应用需要一个中间层,这个中间层可以用来多线程执行任务)来灵活的决定它想要执行的任务。
    13.png

    为了保证资源的相对隔离性,Mesos 对 Kernel的cgroups和namespaces 提供了内置的原生支持,当然你也可以将一个Docker容器当做一个任务去运行。这样一来,它便给你提供了一个多租户的(框架)资源池的访问机制(跨主机和主机内部的进程间通信)。

    你可以预请求你所需的资源,当然这样你也就回到了资源固定划分的时代。如果你有一些有状态的应用,那么你需要预定一些资源(这类任务通常需要在同一台主机上运行)并且需要一些持久化的存储卷(数据需要能够支持故障迁移和恢复),而这类需求Mesos同样能够支持。
    ## Mesosphere DCOS
    DCOS(数据中心操作系统)即是Mesos的“核心”与其周边的服务及功能组件所组成的一个生态系统。例如像mesos-dns这样的插件模块,类似一个CLI,一个GUI又或者是提供你想运行的所有的包的仓库等工具,以及像Marathon(又名分布式的init)、Chronos(又名分布式的cron)这样的框架等等。

    顾名思义,它即是意味着一个跨越在数据中心或者云环境所有主机之上的操作系统。DCOS 可以运行在任意的现代Linux环境,公有或私有云,虚拟机甚至是裸机环境。(当前所支持的平台有:亚马逊AWS、谷歌GCE、微软Azure、OpenStack、Vmware、RedHat、CentOS、CoreOS以及Ubuntu)。迄今为止,DCOS 在其公有仓库上已经提供了多达40余种服务组件(Hadoop、Spark、Cassandra、Jenkins、Kafka、MemSQL等等)。

    另附Mesosphere 集群操作系统(DCOS)入门视频

    原文链接:Introduction to Apache Mesos and Mesosphere DCOS (翻译:吴佳兴)

    Fenzo:来自Netflix基于Java语言的Mesos调度器

    edge_dawn 发表了文章 • 0 个评论 • 8658 次浏览 • 2015-09-02 11:55 • 来自相关话题

    【编者的话】Fenzo是一个在Mesos框架上应用的通用任务调度器。它可以让你通过实现各种优化策略的插件,来优化任务调度,同时这也有利于集群的自动缩放。 Netflix有着数百万的用户,要为这个数量级的用户提供可靠的服务并不是一件容易 ...查看全部
    【编者的话】Fenzo是一个在Mesos框架上应用的通用任务调度器。它可以让你通过实现各种优化策略的插件,来优化任务调度,同时这也有利于集群的自动缩放。

    Netflix有着数百万的用户,要为这个数量级的用户提供可靠的服务并不是一件容易的事情。Netflix是由几十个分布式的服务支撑的,其中每个服务都是产品不可或缺的一部分,并且都在不断迭代着。我们需要从两个方面来优化这些服务,一个是用户体验,另外一个是服务的整体性能以及成本。为此,我们很高兴向大家介绍Fenzo这款开源软件,它是一个使用Java语言编写的Apache Mesos框架的调度器。Fenzo负责管理Netflix内部所有服务的调度和资源分配。

    Fenzo现在已经开源,读者可以在GitHub中了解关于它的更多信息。
    ##为什么使用Fenzo?
    之所以要重新开发一个新框架,而不是利用社区中已有的框架,是因为我们考虑到两个方面,一是调度优化,另一个是希望能够根据资源使用情况来自动缩放集群,这两个方面下文中都将会详细解释。Fenzo更适合管理生命周期短暂(ephemerality)的应用,Netflix的用例包括实时操作的响应式数据流系统以及管理基于容器的应用部署。

    在Netflix中,一天业务数据的变化非常大,如果按照业务峰值时所需要的资源来配置集群资源,那将会非常浪费,并且当出现某些热点事件时,我们的系统是无法应对这样的突发情况的。我们需要利用云的弹性以及基于动态负载来缩放集群。

    虽然扩大集群似乎看起来相对比较容易,但是当集群中可用资源低于某一个阈值时,缩小集群就会带来新的挑战。当存在长期运行的任务,并且不能随便被终止时,例如拓扑结构的状态流处理耗时重构,那么调度器如果想让集群缩小,就必须让这样的主机上的所有任务几乎同时终止。
    ##调度策略
    任务调度需要优化资源分配以最大化预期目标。不同的资源分配方式会对结果产生不同的影响, 包括可伸缩性、性能等方面,因此,高效的资源分配方式对于调度管理器来说是至关重要的。比如,选择分配方式时,逐个评估所有的可用资源以及任务,这在计算方面根本吃不消。
    ##调度模型
    我们的设计专注于大规模部署具有多重约束和优化资源需求的多样化的任务与资源。如果评估最优化分配需要很长时间,就可能造成两个问题:

    • 资源闲置,等待新的任务
    • 任务启动时间增加
    Fenzo采用了能够快速推动我们到正确方向的方式,而不是每次都找出最优的调度分配集。从概念上讲,我们认为任务有一个紧迫因素决定多久需要一个任务分配,以及一个适合度因素决定是否适合一个给定的主机。
    FitnessUrgency.png
    如果任务是非常紧迫的,或者如果它非常适合于一个给定的资源,我们继续并分配资源给这个任务。 否则,我们继续让任务挂起,直到紧迫性增加或发现另一台主机具有较大的适合度。##权衡调度速度与优化Fenzo能够为你动态的选择速度与最优分配。它跨多个主机采用一个评价最优分配策略,但是只有当适合度被认为是“足够好”才能获得这种策略。然而用户为足够好的合适度定义了阈值以控制调度速度,用一个合适度评估插件来表示集群任务分配的最优化和最高级别的调度对象。这个合适度计算器由多个其他合适度计算器组成,代表一个多重面向对象。##任务约束Fenzo任务使用可选的软或硬约束影响分配来实现与其他任务的locality和/或资源的亲和力。软约束满足best efforts基准,结合合适度计算器来给可能分配的主机打分,而硬约束则必须满足和充当一个资源选择过滤器。Fenzo把所有相关的集群状态信息提供给适合度计算器和约束插件,这样就可以优化基于作业、资源和时间的各方面的任务。##封装和约束插件Fenzo目前为封装提供了内置的基于CPU、内存以及网络带宽资源或者是他们集合的适合度计算器。一些内置的约束用于解决资源类型的常见位置公共用例,将一组任务分配给不同的主机,平衡跨越给定主机属性的任务,例如可用区、主机位置等。你可以通过提供的新插件定制合适度计算器和约束。##集群自动缩放Fenzo支持使用两种互补的策略集群自动缩放:
    • 基于阈值
    • 基于资源短缺分析

    基于阈值的自动缩放,用户可以指定每个被用在集群当中的host组(如EC2自动缩放组,ASG)。例如,有可能是使用一个EC2实例类型的计算密集型工作负载创建一个ASG,也可以使用网络密集型工作负载创建另一个ASG。每一条规则有助于保持配置可用于快速启动新作业的一定数量的可用主机。

    利用资源短缺分析试图来估计主机数目,以满足待处理负载。这补充了在需求激增当中基于集群扩大的规则。 Fenzo的自动缩放还补充了预测自动缩放系统,如Netflix Scryer。
    ##在Netflix上的应用
    在Netflix上,Fenzo目前被使用在2个Mesos框架中,用于各种使用案例,包括长时间运行的服务和批处理作业。我们已经看到调度器在多重约束和自定义的适合度计算器的情况下分配资源比较快。此外,Fenzo允许我们根据当前的需求,而不是按照需求的峰值集群规模来调整集群大小。

    下表显示了我们观测到的在我们其中的一个集群中的每个调度运行平均时间和最大时间。每个调度运行可能会试图分配资源给多个任务,而运行时间非常依赖于需要分配的任务数、约束的数量和种类以及从中选择资源的主机数量。
    QQ图片20150902114723.png

    下图显示了在集群中几天内Mesos slave的数量变化,作为Fenzo的自动缩放行为的体现,表示3X在最大和最小数值上的不同。
    TitanAutoscaling2.png

    ##Fenzo 在Mesos 框架上的使用
    FenzoUsageDiagram.png

    上面简易的图示告诉我们Fenzo怎样被Apache Mesos框架使用。Fenzo任务调度提供了一个没有与Mesos自身进行交互的调度核心。Mesos的框架和接口在新的资源和任务状态更新上得到回传,同时它让Mesos driver 启动基于Fenzo的分配任务。
    ##总结
    Fenzo已经成为云平台上的一个很好的帮手,它在Mesos上给我们一个高级别的控制任务调度,而且使我们在机器效率与作业运行快速化方面达到一个平衡。除此之外,Fenzo支持集群的自动缩放和封装。通过编写你自己的插件可以实现自定义调度器。

    源代码在Netflix GitHub上可以找到,资源库当中包含了样本框架教大家如何使用Fenzo,而且在JUnit tests中给出了不同类型的例子包括写自定义的适应度计算器和约束条件。Fenzo wiki包含详细的文档来帮助大家开始学习Fenzo。

    原文链接: Fenzo: OSS Scheduler for Apache Mesos Frameworks(翻译:edge_dawn)

    在Docker上运行Apache Kafka

    Azriel 发表了文章 • 1 个评论 • 23539 次浏览 • 2015-08-05 23:28 • 来自相关话题

    【编者的话】在研究Apache Kafka和Docker时,作者发现Docker是一个非常神奇的技术,它将开发过程简化的如此完美。又因为有wurstmeister/kafka和wurstmeister/zookeeper这两个镜像,运行Apache Kafka ...查看全部
    【编者的话】在研究Apache Kafka和Docker时,作者发现Docker是一个非常神奇的技术,它将开发过程简化的如此完美。又因为有wurstmeister/kafka和wurstmeister/zookeeper这两个镜像,运行Apache Kafka和使用Docker是那样的轻松,还有比这更让人觉得兴奋的事情么!让我们完全从安装、维护机器和软件中解脱出来。

    一直很想鼓捣Apache Kafka,但由于我想鼓捣的事太多,Kafka一直没能得到临幸。直到最近,有人要我尝试下这个“中间人”,看看这东西是否能满足一个项目的需求----其实是两个项目。可以想下我当时的表情。

    我编译了Apache Kafka的源代码,将其连接到了Spark Streaming并尝试回答StackOverflow上的一些问题(在使用Scala的Flink中怎样使用Kafka?怎样用jmxtrans见识Kafka中间人?),更不用说阅读繁多的相关文章和看视频。我对什么场景最适合用Apache Kafka有了清醒地认识。

    在和Codilime里的团队开发DeepSense.io平台时,我们只用Ansible自动化部署。我们也尝试过DockerVagrant,都是为了简化DeepSense.io的部署。

    这时就涉及到了两个需求:为了三个项目而研究Apache Kafka和Docker(包括其他工具)!很神奇,不是么?我终于发现了Docker可以让开发产品和部署变得多么简单。只有亲眼所见时,我才意识到Docker竟能如此简化我的开发过程。现在我会把一切Docker化。得知wurstmeister/kafkawurstmeister/zookeeper镜像时,我不可能更幸福了。运行Apache Kafka和使用Docker最终变得如此轻松愉悦。

    然后我就想我应该分享这份热爱,不仅我,所有人都可以从中受益。

    由于我在Mac OS X上,所以用Docker运行Apache Kafka的步骤依赖于boot2docker----一款轻量级Linux,供一些不原生支持Docker的平台,比如前面提到的Mac OS X以及Windows。

    你将会用到wurstmeister/kafkawurstmeister/zookeeper两个镜像。

    你可以在后台或前台独立于镜像运行容器。视你的Unix技术而言,也就是一两个终端的事。这里就对Apache Kafka和Apache Zookeeper各使用一个终端。我会在另一篇博客中解释Apache Zookeeper的作用。

    下面是用Docker运行Apache Kafka的步骤,假设你已经装好了``boot2docker``和``docker``。
    ➜  ~  boot2docker version
    Boot2Docker-cli version: v1.7.1
    Git commit: 8fdc6f5

    ➜ ~ docker --version
    Docker version 1.7.1, build 786b29d

    我很喜欢homebrew,强烈推荐给Mac OS X的用户。很多程序包只需``brew install``一下就能用,包括``docker``和``boot2docker``。
    # 在两个镜像上运行Kafka
    1,(仅适用Mac OS X和Windows用户)执行``boot2docker up``在Mac OS上启动微型Linux内核。
    ➜  ~  boot2docker up
    Waiting for VM and Docker daemon to start...
    .o
    Started.
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/ca.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/cert.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/key.pem

    To connect the Docker client to the Docker daemon, please set:
    export DOCKER_HOST=tcp://192.168.59.104:2376
    export DOCKER_CERT_PATH=/Users/jacek/.boot2docker/certs/boot2docker-vm
    export DOCKER_TLS_VERIFY=1

    2,(仅适用Mac OS X和Windows用户)执行``$(boot2docker shellinit)``设置好终端,让``docker``知道微型Linux内核运行在哪儿(通过``boot2docker``)。为了设置上面的``export``,你必须在所有打开的运行Docker终端中重复这一步骤。如果你遇到``docker``命令的通信问题,记着这一步。
     ➜  ~  $(boot2docker shellinit)
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/ca.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/cert.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/key.pem

    3,执行``docker ps``确保为Docker配置好了终端。

    ➜ ~ docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES

    这时还没有容器运行。一旦首先启动Zookeeper的容器就会有变化,接着是Kafka。

    4,在Docker Hub上创建账号并运行``docker login``保存证书。你不必重复通过``docker pull``从Docker镜像的公用中心下载镜像,把Docker Hub看作存储Docker镜像的GitHub。参考文档使用Docker Hub获得最新信息。

    5,执行``docker pull wurstmeister/kafka``从Docker Hub下载Zookeeper镜像(可能需要几分钟)
     ➜  ~  docker pull wurstmeister/zookeeper
    Pulling repository wurstmeister/zookeeper
    a3075a3d32da: Download complete
    ...
    840840289a0d: Download complete
    e7381f1a45cf: Download complete
    5a6fc057f418: Download complete
    Status: Downloaded newer image for wurstmeister/zookeeper:latest

    你会看到各层的哈希打印在控制台里,符合预期。

    6,执行``docker pull wurstmeister/kafka``从Docker Hub下载Kafka镜像(可能需要几分钟)
     ➜  ~  docker pull wurstmeister/kafka
    latest: Pulling from wurstmeister/kafka
    428b411c28f0: Pull complete
    ...
    422705fe88c8: Pull complete
    02bb7ca441d8: Pull complete
    0f9a08061516: Pull complete
    24fc32f98556: Already exists
    Digest: sha256:06150c136dcfe6e4fbbf37731a2119ea17a953c75902e52775b5511b3572aa1f
    Status: Downloaded newer image for wurstmeister/kafka:latest

    7,在命令行中执行``docker images``验证``wurstmeister/kafka``和``wurstmeister/zookeeper``两个镜像已下载。
     ➜  ~  docker images
    REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
    wurstmeister/kafka latest 24fc32f98556 3 weeks ago 477.6 MB
    wurstmeister/zookeeper latest a3075a3d32da 9 months ago 451 MB

    8,现在可以在一个终端里运行``docker run --name zookeeper -p 2181 -t wurstmeister/zookeeper``引导启动Zookeeper。如果你在Mac OS X或Windows上,记得``$(boot2docker shellinit)``。
     ➜  ~  docker run --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    JMX enabled by default
    Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
    2015-07-17 19:10:40,419 [myid:] - INFO [main:QuorumPeerConfig@103] - Reading configuration from: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
    ...
    2015-07-17 19:10:40,452 [myid:] - INFO [main:ZooKeeperServer@773] - maxSessionTimeout set to -1
    2015-07-17 19:10:40,464 [myid:] - INFO [main:NIOServerCnxnFactory@94] - binding to port 0.0.0.0/0.0.0.0:2181

    现在ZooKeeper在监听2181端口。用Docker(或者Mac OS上的Boot2Docker)的IP地址远程连接确认下。
     ➜  ~  telnet `boot2docker ip` 2181
    Trying 192.168.59.103...
    Connected to 192.168.59.103.
    Escape character is '^]'.

    9,在另一个终端里执行
    docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092 --link zookeeper:zk -t wurstmeister/kafka

    记得``$(boot2docker shellinit)``,如果你在Mac OS X或Windows上。
     ➜  ~  docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092 --link zookeeper:zk -t wurstmeister/kafka
    [2015-07-17 19:32:35,865] INFO Verifying properties (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,891] INFO Property advertised.port is overridden to 9092 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,891] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)
    ...
    [2015-07-17 19:32:35,894] INFO Property zookeeper.connect is overridden to 172.17.0.5:2181 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,895] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,924] INFO [Kafka Server 1], starting (kafka.server.KafkaServer)
    [2015-07-17 19:32:35,925] INFO [Kafka Server 1], Connecting to zookeeper on 172.17.0.5:2181 (kafka.server.KafkaServer)
    [2015-07-17 19:32:35,934] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
    [2015-07-17 19:32:35,939] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
    ...
    [2015-07-17 19:32:36,093] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
    [2015-07-17 19:32:36,095] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
    [2015-07-17 19:32:36,146] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
    [2015-07-17 19:32:36,172] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    [2015-07-17 19:32:36,253] INFO Registered broker 1 at path /brokers/ids/1 with address 61c359a3136b:9092. (kafka.utils.ZkUtils$)
    [2015-07-17 19:32:36,270] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
    [2015-07-17 19:32:36,318] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

    现在你的电脑上运行着依托Docker的Apache Kafka,你是它的的开心用户。用``docker ps``查看容器状态。
     ➜  ~  docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
    0b34a9927004 wurstmeister/kafka "/bin/sh -c start-ka 2 minutes ago Up 2 minutes 0.0.0.0:32769->9092/tcp kafka
    14fd32558b1c wurstmeister/zookeeper "/bin/sh -c '/usr/sb 4 minutes ago Up 4 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32768->2181/tcp zookeeper

    10,要结束你的Apache Kafka旅程时,用``docker stop kafka zookeeper``(或``docker stop $(docker ps -aq)``,如果运行的容器只有``kafka``和``zookeeper``)``docker stop``容器。
     ➜  ~  docker stop kafka zookeeper
    kafka
    zookeeper

    之后运行``docker ps``会显示没有正在运行的容器:
     ➜  ~  docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES

    现在没有正在运行的容器是因为他们被关闭了,这些容器依然可以被再次启动----使用``docker ps -a``查看可以使用的容器。
     ➜  ~  docker ps -a
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
    7dde25ff7ec2 wurstmeister/kafka "/bin/sh -c start-ka 15 hours ago Exited (137) 16 seconds ago kafka
    b7b4b675b9c0 wurstmeister/zookeeper "/bin/sh -c '/usr/sb 16 hours ago Exited (137) 5 seconds ago zookeeper

    11,最后,用``boot2docker down``停止``boot2docker``守护进程(仅对于Mac OS X和Windows用户)。
    #结语
    利用wurstmeister/kafkawurstmeister/zookeeper这两个镜像,不用过多修改本地的工作站环境去安装必需的包诸如Apache ZooKeeper,你就能运行Apache Kafka。除了Docker本身(和Boot2Docker,如果你恰好在Mac OS上),你不必担心其它升级软件和依赖,从而将你从花时间安装和维护你的机器和软件中解脱出来。还有,Docker镜像可以被部署到其它机器,确保一致的内部软件环境。

    你可以在博客下方的评论区留言,或者邮件联系我,告诉我你对这个话题的看法。记得在Twitter上关注作者@jaceklaskowski

    原文链接:Apache Kafka on Docker(翻译:Azriel 审校:魏小红)

    Docker镜像apache服务启动失败

    Sonyfe25cp 回复了问题 • 4 人关注 • 2 个回复 • 5579 次浏览 • 2015-02-05 16:48 • 来自相关话题

    谈谈Apache Mesos和Mesosphere DCOS:历史、架构、发展和应用

    colstuwjx 发表了文章 • 1 个评论 • 15886 次浏览 • 2015-09-20 07:59 • 来自相关话题

    【编者的话】Mesos 是一个很年轻的开源项目,它的理念是怎样的? 它的整体架构以及服务对象又是什么? 基于此的 Mesosphere DCOS 又是如何定位的? 本文作者就这些话题展开了探讨。 ## Mesos 发展史 Mesos 是 ...查看全部
    【编者的话】Mesos 是一个很年轻的开源项目,它的理念是怎样的? 它的整体架构以及服务对象又是什么? 基于此的 Mesosphere DCOS 又是如何定位的? 本文作者就这些话题展开了探讨。
    ## Mesos 发展史
    Mesos 是一个早在2009年由 Benjamin Hindman、Andy Konwinski、Matei Zaharia、Ali Ghodsi、Anthony D. Joseph、Randy Katz、Scott Shenker和Ion Stoica几人联合发起的伯克利大学研究项目。Benjamin 随后将其引入 Twitter,而如今它已经完美的运行在他们的数据中心上, Benjamin 本人也在不久之后成为了 Mesosphere 的首席架构师,正是它构建了 Mesosphere 数据中心操作系统(DCOS)。

    Mesos 的设计宗旨在于尝试和提高集群的利用效率和性能,他们认为对于数据中心资源的单纯静态划分和使用的这样一个方式是值得重新考量的,举个例子来说:

    我们假设你的数据中心里拥有9个主机:
    1.png

    如果把它静态的划分开来,并且指定每三个主机承载一个应用,这样一来总共是3个应用(这里是Hadoop、Spark 和 Ruby on Rails)。
    2.png

    显而易见的一个问题是这些主机的资源利用率并不会很高;
    3.png

    因此如果你想使用全部的资源,即这里例子中的全部9台主机,那么就需要将其抽象成一个共享资源池,而你可以按需计划配置,这样的话,利用率自然可以得到相应的提升;
    4.png


    Mesos团队的第二个观点在于他们觉得需要为分布式系统量身定制一套新的系统,换句话说,他们觉得MapReduce并不是适用于所有的场景(这也导致了Spark的诞生,而它又是另外一个故事了),而我们需要一个新的更简单和更具有通用性的专为分布式系统提供服务的这样一个框架。
    ## Mesos 框架(分布式系统)到底是什么?
    一般来说,一个分布式系统你需要有一个Coordinator(调度器)和 多个Worker(执行任务)。调度器以同步(分布式)的方式运行进程/任务,处理程序错误(容错),并且负责优化性能(即弹性伸缩)。换句话说,它负责协调在数据中心去实际执行你想要运行的代码(不需要是一个完整的程序,它也可以是某些种类的运算)。正如之前所提到的那样,Mesos将其称之为联合调度。
    5.png

    或者也可以这么说,Mesos是一个带有调度器的分布式系统。
    6.png

    那么Mesos的真正定位是什么呢? 当你尝试去执行它的任务时你可以理解为它实际上就是机器和调度器之间的一层抽象。

    因此在Mesos里,调度器是和Mesos层(通过API等)通信,而不是直接跟物理机器打交道。Mesos这里通过这样的方式尝试解决的即是资源的静态划分问题,这意味着你不再需要针对每个特定的运行时分配一个对应的调度器去决定实际去执行它的workers,而取而代之的是,你有一个调度器去和Mesos通信,而它会反过来依据整个资源池的剩余资源做调度。
    7.png

    这样做带来的最显而易见的好处就是你可以在一批机器上运行多个不同的分布式系统并且更有效的(不再是静态划分)动态划分和共享这些资源。
    8.png

    其次,之所以这样抽象设计的另外一个重要原因在于它能够提供一个通用功能集(故障检测、分布式任务、任务启动、任务监控、结束任务、清理任务等),这样一来就无需每个分布式系统都各自重复的去实现这样一套逻辑。
    ## Mesos 适合作为数据中心的哪一层的抽象?
    Mesos 这一层抽象实现的目的即是想要尝试通过使用并更好的调度资源使得运行在其之上的这些框架变得更加易于构建和运行。
    9.png

    IaaS的抽象的是机器,例如你给它指定一个数字,它便会生成一堆的机器而这也可以看作是Mesos概念模型更底层化的一个抽象。PaaS则考虑的更多是部署和管理应用/服务,它并不关心底层的那些基础架构,而你可以把它看作是Mesos概念模型的一个更高层面的抽象。在交互方面,PaaS可能是和开发者直接交互,而Mesos则是以API的形式和软件程序交互。

    换句话说,你可以基于Mesos之上构建一个Paas系统(例如像Marathon - 它好像任何地方都比一个真正的Paas系统更像PaaS),同时你可以在一个IaaS上运行Mesos(例如OpenStack)。

    如果你将你的Mesos运行在一个组合系统(例如就像Openstack + 物理硬件 + 虚拟机)之上,那么你可以很直观的再次体会到动态划分资源的好处,那便是你能够跨越这些底层组件而直接的去管理和计划你的工作负载,某种意义上来说,你可以认为Mesos类似于是一个数据中心的内核,即它负责将物理机器抽象成资源,从而使得你能够忽略底层组件的存在,通过消费Mesos的抽象资源来构建分布式系统。

    因此我们可以说,Apache Mesos是为构建和运行其它分布式系统(例如像Spark)提供服务的分布式系统。
    ## Mesos架构内幕
    在 Mesos 里,一个框架程序(或者说分布式系统)发起的一次请求会在被接收到的那个时刻由调度器承接和分配。这跟传统分布式系统一般人为发起请求的方式不太一样(再强调一下,Mesos将会让框架程序发起请求,而不是人工操作),传统的方式即需要在人为发起请求时设定好需要分配的特定资源,然后再去真正请求和获取这些资源,这类情况中最典型的莫过于需求场景的变换(设想在Map/Reduce的场景下,比如在Map和Reduce阶段切换之际产生的一个需求资源的变化)
    10.png

    与传统分布式系统不一样的是,Mesos 将会立马为其分配所能分配的最大资源,而不是傻傻的在那等到满足该请求的资源完成/完全到位(在这里它想要实现的便是在绝大多数情况下十分奏效的无阻塞式资源分配策略,即你无须立马消费预期请求的全量资源的这样的情景)。

    当然,现在框架类应用(分布式系统)也可以使用Mesos提供的资源完成他们自己的调度,这便是所谓的 “二次资源调度”。
    11.png

    最终达到的效果即是你下发的一个任务可以在整个数据中心的任意一个地方提交并且运行。

    构建这样的“二次资源调度”系统的原因在于它可以在同一时间内支持多个分布式系统。同样以上面的例子来解释,Mesos为Spark提供和分配所需的资源。而这里,Spark则负责决策和分配这些可用资源去运行实际任务(即因为可用的资源得以满足需求,所以我才能够实际去运行这些map任务)。
    12.png

    所以一旦一个任务被框架应用提交到Mesos,那么这些任务就必须被实际执行。Mesos master 负责指派任务给每个slave,而每个slave通过上面跑着的agent来管理和运行这些任务。(这即是说如果这个任务是对应的一个命令,那么它会去执行它,如果它需要一些特定的资源来完成这个任务,比如像jar包,那么它会先获取所需的资源,然后在一个沙盒里执行它,最后才发起这个任务)

    或者说你也可以这样,框架应用可以通过一个执行器(框架应用需要一个中间层,这个中间层可以用来多线程执行任务)来灵活的决定它想要执行的任务。
    13.png

    为了保证资源的相对隔离性,Mesos 对 Kernel的cgroups和namespaces 提供了内置的原生支持,当然你也可以将一个Docker容器当做一个任务去运行。这样一来,它便给你提供了一个多租户的(框架)资源池的访问机制(跨主机和主机内部的进程间通信)。

    你可以预请求你所需的资源,当然这样你也就回到了资源固定划分的时代。如果你有一些有状态的应用,那么你需要预定一些资源(这类任务通常需要在同一台主机上运行)并且需要一些持久化的存储卷(数据需要能够支持故障迁移和恢复),而这类需求Mesos同样能够支持。
    ## Mesosphere DCOS
    DCOS(数据中心操作系统)即是Mesos的“核心”与其周边的服务及功能组件所组成的一个生态系统。例如像mesos-dns这样的插件模块,类似一个CLI,一个GUI又或者是提供你想运行的所有的包的仓库等工具,以及像Marathon(又名分布式的init)、Chronos(又名分布式的cron)这样的框架等等。

    顾名思义,它即是意味着一个跨越在数据中心或者云环境所有主机之上的操作系统。DCOS 可以运行在任意的现代Linux环境,公有或私有云,虚拟机甚至是裸机环境。(当前所支持的平台有:亚马逊AWS、谷歌GCE、微软Azure、OpenStack、Vmware、RedHat、CentOS、CoreOS以及Ubuntu)。迄今为止,DCOS 在其公有仓库上已经提供了多达40余种服务组件(Hadoop、Spark、Cassandra、Jenkins、Kafka、MemSQL等等)。

    另附Mesosphere 集群操作系统(DCOS)入门视频

    原文链接:Introduction to Apache Mesos and Mesosphere DCOS (翻译:吴佳兴)

    Fenzo:来自Netflix基于Java语言的Mesos调度器

    edge_dawn 发表了文章 • 0 个评论 • 8658 次浏览 • 2015-09-02 11:55 • 来自相关话题

    【编者的话】Fenzo是一个在Mesos框架上应用的通用任务调度器。它可以让你通过实现各种优化策略的插件,来优化任务调度,同时这也有利于集群的自动缩放。 Netflix有着数百万的用户,要为这个数量级的用户提供可靠的服务并不是一件容易 ...查看全部
    【编者的话】Fenzo是一个在Mesos框架上应用的通用任务调度器。它可以让你通过实现各种优化策略的插件,来优化任务调度,同时这也有利于集群的自动缩放。

    Netflix有着数百万的用户,要为这个数量级的用户提供可靠的服务并不是一件容易的事情。Netflix是由几十个分布式的服务支撑的,其中每个服务都是产品不可或缺的一部分,并且都在不断迭代着。我们需要从两个方面来优化这些服务,一个是用户体验,另外一个是服务的整体性能以及成本。为此,我们很高兴向大家介绍Fenzo这款开源软件,它是一个使用Java语言编写的Apache Mesos框架的调度器。Fenzo负责管理Netflix内部所有服务的调度和资源分配。

    Fenzo现在已经开源,读者可以在GitHub中了解关于它的更多信息。
    ##为什么使用Fenzo?
    之所以要重新开发一个新框架,而不是利用社区中已有的框架,是因为我们考虑到两个方面,一是调度优化,另一个是希望能够根据资源使用情况来自动缩放集群,这两个方面下文中都将会详细解释。Fenzo更适合管理生命周期短暂(ephemerality)的应用,Netflix的用例包括实时操作的响应式数据流系统以及管理基于容器的应用部署。

    在Netflix中,一天业务数据的变化非常大,如果按照业务峰值时所需要的资源来配置集群资源,那将会非常浪费,并且当出现某些热点事件时,我们的系统是无法应对这样的突发情况的。我们需要利用云的弹性以及基于动态负载来缩放集群。

    虽然扩大集群似乎看起来相对比较容易,但是当集群中可用资源低于某一个阈值时,缩小集群就会带来新的挑战。当存在长期运行的任务,并且不能随便被终止时,例如拓扑结构的状态流处理耗时重构,那么调度器如果想让集群缩小,就必须让这样的主机上的所有任务几乎同时终止。
    ##调度策略
    任务调度需要优化资源分配以最大化预期目标。不同的资源分配方式会对结果产生不同的影响, 包括可伸缩性、性能等方面,因此,高效的资源分配方式对于调度管理器来说是至关重要的。比如,选择分配方式时,逐个评估所有的可用资源以及任务,这在计算方面根本吃不消。
    ##调度模型
    我们的设计专注于大规模部署具有多重约束和优化资源需求的多样化的任务与资源。如果评估最优化分配需要很长时间,就可能造成两个问题:

    • 资源闲置,等待新的任务
    • 任务启动时间增加
    Fenzo采用了能够快速推动我们到正确方向的方式,而不是每次都找出最优的调度分配集。从概念上讲,我们认为任务有一个紧迫因素决定多久需要一个任务分配,以及一个适合度因素决定是否适合一个给定的主机。
    FitnessUrgency.png
    如果任务是非常紧迫的,或者如果它非常适合于一个给定的资源,我们继续并分配资源给这个任务。 否则,我们继续让任务挂起,直到紧迫性增加或发现另一台主机具有较大的适合度。##权衡调度速度与优化Fenzo能够为你动态的选择速度与最优分配。它跨多个主机采用一个评价最优分配策略,但是只有当适合度被认为是“足够好”才能获得这种策略。然而用户为足够好的合适度定义了阈值以控制调度速度,用一个合适度评估插件来表示集群任务分配的最优化和最高级别的调度对象。这个合适度计算器由多个其他合适度计算器组成,代表一个多重面向对象。##任务约束Fenzo任务使用可选的软或硬约束影响分配来实现与其他任务的locality和/或资源的亲和力。软约束满足best efforts基准,结合合适度计算器来给可能分配的主机打分,而硬约束则必须满足和充当一个资源选择过滤器。Fenzo把所有相关的集群状态信息提供给适合度计算器和约束插件,这样就可以优化基于作业、资源和时间的各方面的任务。##封装和约束插件Fenzo目前为封装提供了内置的基于CPU、内存以及网络带宽资源或者是他们集合的适合度计算器。一些内置的约束用于解决资源类型的常见位置公共用例,将一组任务分配给不同的主机,平衡跨越给定主机属性的任务,例如可用区、主机位置等。你可以通过提供的新插件定制合适度计算器和约束。##集群自动缩放Fenzo支持使用两种互补的策略集群自动缩放:
    • 基于阈值
    • 基于资源短缺分析

    基于阈值的自动缩放,用户可以指定每个被用在集群当中的host组(如EC2自动缩放组,ASG)。例如,有可能是使用一个EC2实例类型的计算密集型工作负载创建一个ASG,也可以使用网络密集型工作负载创建另一个ASG。每一条规则有助于保持配置可用于快速启动新作业的一定数量的可用主机。

    利用资源短缺分析试图来估计主机数目,以满足待处理负载。这补充了在需求激增当中基于集群扩大的规则。 Fenzo的自动缩放还补充了预测自动缩放系统,如Netflix Scryer。
    ##在Netflix上的应用
    在Netflix上,Fenzo目前被使用在2个Mesos框架中,用于各种使用案例,包括长时间运行的服务和批处理作业。我们已经看到调度器在多重约束和自定义的适合度计算器的情况下分配资源比较快。此外,Fenzo允许我们根据当前的需求,而不是按照需求的峰值集群规模来调整集群大小。

    下表显示了我们观测到的在我们其中的一个集群中的每个调度运行平均时间和最大时间。每个调度运行可能会试图分配资源给多个任务,而运行时间非常依赖于需要分配的任务数、约束的数量和种类以及从中选择资源的主机数量。
    QQ图片20150902114723.png

    下图显示了在集群中几天内Mesos slave的数量变化,作为Fenzo的自动缩放行为的体现,表示3X在最大和最小数值上的不同。
    TitanAutoscaling2.png

    ##Fenzo 在Mesos 框架上的使用
    FenzoUsageDiagram.png

    上面简易的图示告诉我们Fenzo怎样被Apache Mesos框架使用。Fenzo任务调度提供了一个没有与Mesos自身进行交互的调度核心。Mesos的框架和接口在新的资源和任务状态更新上得到回传,同时它让Mesos driver 启动基于Fenzo的分配任务。
    ##总结
    Fenzo已经成为云平台上的一个很好的帮手,它在Mesos上给我们一个高级别的控制任务调度,而且使我们在机器效率与作业运行快速化方面达到一个平衡。除此之外,Fenzo支持集群的自动缩放和封装。通过编写你自己的插件可以实现自定义调度器。

    源代码在Netflix GitHub上可以找到,资源库当中包含了样本框架教大家如何使用Fenzo,而且在JUnit tests中给出了不同类型的例子包括写自定义的适应度计算器和约束条件。Fenzo wiki包含详细的文档来帮助大家开始学习Fenzo。

    原文链接: Fenzo: OSS Scheduler for Apache Mesos Frameworks(翻译:edge_dawn)

    在Docker上运行Apache Kafka

    Azriel 发表了文章 • 1 个评论 • 23539 次浏览 • 2015-08-05 23:28 • 来自相关话题

    【编者的话】在研究Apache Kafka和Docker时,作者发现Docker是一个非常神奇的技术,它将开发过程简化的如此完美。又因为有wurstmeister/kafka和wurstmeister/zookeeper这两个镜像,运行Apache Kafka ...查看全部
    【编者的话】在研究Apache Kafka和Docker时,作者发现Docker是一个非常神奇的技术,它将开发过程简化的如此完美。又因为有wurstmeister/kafka和wurstmeister/zookeeper这两个镜像,运行Apache Kafka和使用Docker是那样的轻松,还有比这更让人觉得兴奋的事情么!让我们完全从安装、维护机器和软件中解脱出来。

    一直很想鼓捣Apache Kafka,但由于我想鼓捣的事太多,Kafka一直没能得到临幸。直到最近,有人要我尝试下这个“中间人”,看看这东西是否能满足一个项目的需求----其实是两个项目。可以想下我当时的表情。

    我编译了Apache Kafka的源代码,将其连接到了Spark Streaming并尝试回答StackOverflow上的一些问题(在使用Scala的Flink中怎样使用Kafka?怎样用jmxtrans见识Kafka中间人?),更不用说阅读繁多的相关文章和看视频。我对什么场景最适合用Apache Kafka有了清醒地认识。

    在和Codilime里的团队开发DeepSense.io平台时,我们只用Ansible自动化部署。我们也尝试过DockerVagrant,都是为了简化DeepSense.io的部署。

    这时就涉及到了两个需求:为了三个项目而研究Apache Kafka和Docker(包括其他工具)!很神奇,不是么?我终于发现了Docker可以让开发产品和部署变得多么简单。只有亲眼所见时,我才意识到Docker竟能如此简化我的开发过程。现在我会把一切Docker化。得知wurstmeister/kafkawurstmeister/zookeeper镜像时,我不可能更幸福了。运行Apache Kafka和使用Docker最终变得如此轻松愉悦。

    然后我就想我应该分享这份热爱,不仅我,所有人都可以从中受益。

    由于我在Mac OS X上,所以用Docker运行Apache Kafka的步骤依赖于boot2docker----一款轻量级Linux,供一些不原生支持Docker的平台,比如前面提到的Mac OS X以及Windows。

    你将会用到wurstmeister/kafkawurstmeister/zookeeper两个镜像。

    你可以在后台或前台独立于镜像运行容器。视你的Unix技术而言,也就是一两个终端的事。这里就对Apache Kafka和Apache Zookeeper各使用一个终端。我会在另一篇博客中解释Apache Zookeeper的作用。

    下面是用Docker运行Apache Kafka的步骤,假设你已经装好了``boot2docker``和``docker``。
    ➜  ~  boot2docker version
    Boot2Docker-cli version: v1.7.1
    Git commit: 8fdc6f5

    ➜ ~ docker --version
    Docker version 1.7.1, build 786b29d

    我很喜欢homebrew,强烈推荐给Mac OS X的用户。很多程序包只需``brew install``一下就能用,包括``docker``和``boot2docker``。
    # 在两个镜像上运行Kafka
    1,(仅适用Mac OS X和Windows用户)执行``boot2docker up``在Mac OS上启动微型Linux内核。
    ➜  ~  boot2docker up
    Waiting for VM and Docker daemon to start...
    .o
    Started.
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/ca.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/cert.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/key.pem

    To connect the Docker client to the Docker daemon, please set:
    export DOCKER_HOST=tcp://192.168.59.104:2376
    export DOCKER_CERT_PATH=/Users/jacek/.boot2docker/certs/boot2docker-vm
    export DOCKER_TLS_VERIFY=1

    2,(仅适用Mac OS X和Windows用户)执行``$(boot2docker shellinit)``设置好终端,让``docker``知道微型Linux内核运行在哪儿(通过``boot2docker``)。为了设置上面的``export``,你必须在所有打开的运行Docker终端中重复这一步骤。如果你遇到``docker``命令的通信问题,记着这一步。
     ➜  ~  $(boot2docker shellinit)
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/ca.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/cert.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/key.pem

    3,执行``docker ps``确保为Docker配置好了终端。

    ➜ ~ docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES

    这时还没有容器运行。一旦首先启动Zookeeper的容器就会有变化,接着是Kafka。

    4,在Docker Hub上创建账号并运行``docker login``保存证书。你不必重复通过``docker pull``从Docker镜像的公用中心下载镜像,把Docker Hub看作存储Docker镜像的GitHub。参考文档使用Docker Hub获得最新信息。

    5,执行``docker pull wurstmeister/kafka``从Docker Hub下载Zookeeper镜像(可能需要几分钟)
     ➜  ~  docker pull wurstmeister/zookeeper
    Pulling repository wurstmeister/zookeeper
    a3075a3d32da: Download complete
    ...
    840840289a0d: Download complete
    e7381f1a45cf: Download complete
    5a6fc057f418: Download complete
    Status: Downloaded newer image for wurstmeister/zookeeper:latest

    你会看到各层的哈希打印在控制台里,符合预期。

    6,执行``docker pull wurstmeister/kafka``从Docker Hub下载Kafka镜像(可能需要几分钟)
     ➜  ~  docker pull wurstmeister/kafka
    latest: Pulling from wurstmeister/kafka
    428b411c28f0: Pull complete
    ...
    422705fe88c8: Pull complete
    02bb7ca441d8: Pull complete
    0f9a08061516: Pull complete
    24fc32f98556: Already exists
    Digest: sha256:06150c136dcfe6e4fbbf37731a2119ea17a953c75902e52775b5511b3572aa1f
    Status: Downloaded newer image for wurstmeister/kafka:latest

    7,在命令行中执行``docker images``验证``wurstmeister/kafka``和``wurstmeister/zookeeper``两个镜像已下载。
     ➜  ~  docker images
    REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
    wurstmeister/kafka latest 24fc32f98556 3 weeks ago 477.6 MB
    wurstmeister/zookeeper latest a3075a3d32da 9 months ago 451 MB

    8,现在可以在一个终端里运行``docker run --name zookeeper -p 2181 -t wurstmeister/zookeeper``引导启动Zookeeper。如果你在Mac OS X或Windows上,记得``$(boot2docker shellinit)``。
     ➜  ~  docker run --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    JMX enabled by default
    Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
    2015-07-17 19:10:40,419 [myid:] - INFO [main:QuorumPeerConfig@103] - Reading configuration from: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
    ...
    2015-07-17 19:10:40,452 [myid:] - INFO [main:ZooKeeperServer@773] - maxSessionTimeout set to -1
    2015-07-17 19:10:40,464 [myid:] - INFO [main:NIOServerCnxnFactory@94] - binding to port 0.0.0.0/0.0.0.0:2181

    现在ZooKeeper在监听2181端口。用Docker(或者Mac OS上的Boot2Docker)的IP地址远程连接确认下。
     ➜  ~  telnet `boot2docker ip` 2181
    Trying 192.168.59.103...
    Connected to 192.168.59.103.
    Escape character is '^]'.

    9,在另一个终端里执行
    docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092 --link zookeeper:zk -t wurstmeister/kafka

    记得``$(boot2docker shellinit)``,如果你在Mac OS X或Windows上。
     ➜  ~  docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092 --link zookeeper:zk -t wurstmeister/kafka
    [2015-07-17 19:32:35,865] INFO Verifying properties (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,891] INFO Property advertised.port is overridden to 9092 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,891] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)
    ...
    [2015-07-17 19:32:35,894] INFO Property zookeeper.connect is overridden to 172.17.0.5:2181 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,895] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,924] INFO [Kafka Server 1], starting (kafka.server.KafkaServer)
    [2015-07-17 19:32:35,925] INFO [Kafka Server 1], Connecting to zookeeper on 172.17.0.5:2181 (kafka.server.KafkaServer)
    [2015-07-17 19:32:35,934] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
    [2015-07-17 19:32:35,939] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
    ...
    [2015-07-17 19:32:36,093] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
    [2015-07-17 19:32:36,095] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
    [2015-07-17 19:32:36,146] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
    [2015-07-17 19:32:36,172] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    [2015-07-17 19:32:36,253] INFO Registered broker 1 at path /brokers/ids/1 with address 61c359a3136b:9092. (kafka.utils.ZkUtils$)
    [2015-07-17 19:32:36,270] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
    [2015-07-17 19:32:36,318] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

    现在你的电脑上运行着依托Docker的Apache Kafka,你是它的的开心用户。用``docker ps``查看容器状态。
     ➜  ~  docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
    0b34a9927004 wurstmeister/kafka "/bin/sh -c start-ka 2 minutes ago Up 2 minutes 0.0.0.0:32769->9092/tcp kafka
    14fd32558b1c wurstmeister/zookeeper "/bin/sh -c '/usr/sb 4 minutes ago Up 4 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32768->2181/tcp zookeeper

    10,要结束你的Apache Kafka旅程时,用``docker stop kafka zookeeper``(或``docker stop $(docker ps -aq)``,如果运行的容器只有``kafka``和``zookeeper``)``docker stop``容器。
     ➜  ~  docker stop kafka zookeeper
    kafka
    zookeeper

    之后运行``docker ps``会显示没有正在运行的容器:
     ➜  ~  docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES

    现在没有正在运行的容器是因为他们被关闭了,这些容器依然可以被再次启动----使用``docker ps -a``查看可以使用的容器。
     ➜  ~  docker ps -a
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
    7dde25ff7ec2 wurstmeister/kafka "/bin/sh -c start-ka 15 hours ago Exited (137) 16 seconds ago kafka
    b7b4b675b9c0 wurstmeister/zookeeper "/bin/sh -c '/usr/sb 16 hours ago Exited (137) 5 seconds ago zookeeper

    11,最后,用``boot2docker down``停止``boot2docker``守护进程(仅对于Mac OS X和Windows用户)。
    #结语
    利用wurstmeister/kafkawurstmeister/zookeeper这两个镜像,不用过多修改本地的工作站环境去安装必需的包诸如Apache ZooKeeper,你就能运行Apache Kafka。除了Docker本身(和Boot2Docker,如果你恰好在Mac OS上),你不必担心其它升级软件和依赖,从而将你从花时间安装和维护你的机器和软件中解脱出来。还有,Docker镜像可以被部署到其它机器,确保一致的内部软件环境。

    你可以在博客下方的评论区留言,或者邮件联系我,告诉我你对这个话题的看法。记得在Twitter上关注作者@jaceklaskowski

    原文链接:Apache Kafka on Docker(翻译:Azriel 审校:魏小红)

    Docker镜像apache服务启动失败

    回复

    Sonyfe25cp 回复了问题 • 4 人关注 • 2 个回复 • 5579 次浏览 • 2015-02-05 16:48 • 来自相关话题

    基于服务的分布式事务(上)

    微服务蜂巢 发表了文章 • 0 个评论 • 734 次浏览 • 2019-02-26 10:19 • 来自相关话题

    传统数据库事务 在传统单体应用架构下,我们通常会将业务数据存储在一个数据库中,应用各模块直接对数据库进行操作业务数据。由数据库提供基于ACID的事务保证。 A是Atomic 原子性:事务作为整体来执行,要么 ...查看全部
    传统数据库事务

    在传统单体应用架构下,我们通常会将业务数据存储在一个数据库中,应用各模块直接对数据库进行操作业务数据。由数据库提供基于ACID的事务保证。

    • A是Atomic 原子性:事务作为整体来执行,要么全部执行,要么都不执行。
    • C是Consistency 一致性:事务应确保数据从一个一致的状态转变为另一个一致的状态。
    • I是 Isolation 隔离性:多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
    • D是Durability 持久性:已提交的事务修改数据会被持久保持。
    例如一个电商的下单操作,就涉及到用户系统、库存系统、支付系统以及配送系统等一系列的协同操作。我们在执行下单操作的过程中,如果出现库存短缺,或者用户账户余额不足的情况,这个下单操作就会涉及到一系列的相关业务系统调用。如果这些子系统连接同一个数据库,我们可以通过数据库提供的事务原子性机制将库存数量校验以及用户余额校验的工作,和执行具体的下单业务操作组合成为一个数据库事务操作。通过数据库事务原子性来保证系统各个模块的调用要么都成功,要么都失败(取消)。 同时,由于数据库提供一致性和持久性保证,保证了如果事务执行成功并提交,本次业务操作的数据在立即生效的同时不会产生异议。 同时数据库提供了不同级别的数据锁机制保证应用多个线程同时读取或者更新数据的过程中不会相互影响,从而来保证业务操作的隔离性。 微服务的分布式事务 随着微服务架构的流行,很多大型的业务流程被拆分成为了多个功能单一的基础服务,大家会根据业务的诉求在这些基础服务之上编写一些组合调用服务来满足业务诉求。为了保证微服务能够独立开发部署运行,通常我们会采用一个微服务对应一个数据库的架构,将内部数据经微服务封装之后,以服务的方式对外暴露。这样以往基于数据库来实现的数据操作,就变成了多个对外提供服务的微服务系统的协同完成操作。因为单个微服务只知道自己的服务执行情况,为了保证分布事务的一致性,参与分布式事务的微服务通常会依托协调器完成相关的一致性协调操作。在十多年前分布式事务的实现方案有CORBA的 Object Transaction Service(OTS)、J2EE的 Java Transaction API 以及 Java Transaction Service。这些事务管理以及事务服务的技术都是建立在ACID事务的概念上的。协调器依托于底层的资源交互协议实现资源的占用以及提交的操作,通过两阶段提交的方式实现分布式事务的强一致操作。两阶段提交将分布式事务操作分为准备和提交两个阶段:系统在准备解决阶段完成资源操作, 如果准备阶段中出现问题,支持回滚操作,但是在提交阶段是不允许出错的。两阶段在保证事务原子性上做了很多工作,但是两阶段提交最大的问题是在分布式事务执行过程中, 所有参与事务的节点资源都是被锁定的,系统不允许其他节点访问锁定的资源,在这种执行下很难进一步提升系统的执行效率。如上所述,在ACID的事务执行过程中,为了保证事务的隔离性,通常我们会采用读写加锁的方式,通过串行处理数据方式,保证多个事务在同时执行的过程中不会相互影响。也就是说只有当事务提交并且保存修改记录或者回退取消修改记录之后,其他的事务才能继续执行。然而对于由多个事务组成的长时间运行的事务来说,如果在整个事务的执行过程都采用这种机制来保证事务的隔离性是一种很低效的解决方案。那我们有什么办法即提高系统运行效率,又能保证事务的数据一致性呢?答案是采用补偿的方式来解决这一问题。 基于补偿的事务实现 补偿是指我们将一个事务分成一个本地执行的正常操作事务和一个逻辑上对之前的操作进行补偿的事务。这样采用补偿事务的方式,我们可以把一个长时间运行的事务分化成若干个可以立即提交的本地事务调用,而不是一个长时间占用锁资源的巨型事务。 这样做的最大好处就是极大降低锁占用的时间。作为代价,补偿方式的取消操作和以往的实现方式有很大的不同,我们需要执行一个单独的ACID事务来完成对之前已提交的事务的逻辑补偿。下图展示了一个典型的分布式事务调用, 用户请求触发事务初始服务, 事务初始服务会顺序调用两个事务参与服务(服务A,服务B)。由于这两个事务参与服务之间没有联系,当事务参与服务执行出现了问题,需要一个协调器参与相关的恢复操作。
    01.jpg
    这里我们可以根据补偿执行的不同将其分成两组不同的补偿方式:不完美补偿 - 反向操作会留下之前原始事务操作的痕迹,一般来说我们是会在原始事务记录中设置取消状态。完美补偿 - 反向逻辑会彻底清理之前的原始事务操作,一般来说是不会保留原始事务交易记录,用户是感知不到事务取消之前的状态信息的。对于采用不完美的补偿方式的系统(Saga实现)来说:我们的补偿事务逻辑和其他的事务逻辑相比没有什么不同, 系统只需要像执行其他业务逻辑一样执行相关的补偿操作即可,无需设置特殊的处理逻辑来恢复事务执行之前的状态。以我们常见的银行ATM取款业务为例,银行账户预先进行扣减的操作,如果取款不成功,其逻辑恢复操作就是通过冲正的方式将预先扣减的款项打回到用户账户,我们可以通过查看账户的交易记录找到扣减和冲正的记录信息。下图展示的内容就是当初始服务调用分别调用服务A和服务B,服务B执行出现错误,这个时候我们事务协调器会调用服务A的冲正方法将系统状态恢复到执行服务调用之前的状态。对于采用完美补偿方式的系统(Try-Cancel/Confirm实现)来说:为了让系统能够在补偿操作彻底清除事务执行的情况,我们会借助两阶段提交协议来完成这部分的功能。在TCC方式下,cancel补偿显然是在第二阶段需要执行业务逻辑来取消第一阶段产生的后果。try是在第一阶段执行相关的业务操作,完成相关业务资源的占用,例如预先分配票务资源,或者检查并刷新用户的账户信用额度。在cancel阶段释放相关的业务资源,例如释放预先分配的票务资源或者恢复之前占用的用户信用额度。那我们为什么还要加入confirm操作呢?这需要从业务资源的使用生命周期来入手。在try过程中,我们只是占用的业务资源,相关的执行操作只是出于待定状态,只有在确认操作执行完毕之后,业务资源才能真正被确认。例如订票业务的try操作,我们只是占用了相关的票务资源。目的是防止票务资源被其他用户占用,但是业务还没有执行完毕,票务提供方还不能将被占用的票务资源统计为已售出票务。 只有相关票务资源被确认售出的之后,票务提供方才能将其统计为已售出票务资源。 ServiceComb Pack架构介绍 通过上面的分析我们可以发现一个有意思的现象,每一步事务的操作都有可能会根据业务的执行情况提供一个补偿操作,通过一个事务管理系统来协调这个补偿操作可以帮我们大大降低业务流程建模的复杂度。在分布式事务实现过程中, 协调器的作用非常重要, 各个事务的参与方需要跟协调器建立好良好的沟通, 由协调器统一调度完成相关事务的执行或者取消的操作。ServiceComb Pack架构如下图所示,主要包含两个组件,即Alpha和Omega,其中:
    02.png
    Alpha充当协调者的角色,主要负责对事务的事件进行持久化存储以及协调子事务的状态,使其最终得以与全局事务的状态保持一致,即保证事务中的子事务要么全执行,要么全不执行。Omega是微服务中内嵌的一个agent,负责对监控本地事务执行情况并向Alpha上报事务执行事件,并在异常情况下根据alpha下发的指令执行相应的补偿或重试操作。Omega可以通过向Alpha发送消息的方式向Alpha实时传递事务执行的进展,但是Alpha怎么知道这些Omega上传的消息是相互关联的呢?我们通过在服务调用过程中插入唯一的全局事务ID,并在后续的调用其它服务过程中传递这个全局事务ID。通过全局事务ID可以从汇总到Alpha事件中找到事件与之相关联的所有事件,通过对这些事件信息进行分析,我们可以完整地追踪到与分布式事务执行情况。
    03.jpg
    Omega会以切面编程的方式向应用程序注入相关的处理模块,帮助我们构建分布式事务调用的上下文。Omega在事务处理初始阶段处理事务的相关准备的操作,并在事务执行完毕后做一些清理的操作,例如创建分布式事务起始事件,以及相关的子事件,根据事务执行的成功或者失败生成相关的事务终止或者失败事件。这样带来的好处是用户的代码只需要添加几个annotation来描述分布式事务执行范围,以及与本地的事务处理恢复的相关函数信息,Omega就能通过切面注入的代码追踪本地事务的执行情况。Omega会将本地事务执行的情况以事件的方式通知给Alpha。由于单个Omega不可能知晓一个分布式事务下其他参与服务的执行情况,如此一来,就需要Alpha扮演一个十分重要的协调者的角色。Alpha将收集到的分布式事务事件信息整理汇总,通过分析这些事件之间的关系可以了解到分布式事务的执行情况。Alpha通过向Omega下发相关的执行指令由Omega执行相关提交或恢复操作,实现分布式事务的最终一致性。
    04.jpg
    在了解过Pack实现的部分细节之后, 我们可以从下图进一步了解ServiceComb Pack架构下,Alpha与Omega内部各模块之间的关系图。
    05.jpg
    整个架构分为三个部分:[list=1]
  • 一个是Alpha协调器(支持多个实例提供高可用支持)。
  • 二是注入到微服务实例中的Omega。
  • 三是Alpha与Omega之间的交互协议。
  • 目前ServiceComb Pack支持Saga 以及TCC两种分布式事务协调协议实现。Omega包含了与分析用户分布式事务逻辑相关的模块:
    • 事务注解模块(Transaction Annotation)
    • 事务拦截器(Transaction Interceptor)
    • 分布式事务执行相关的事务上下文(Transaction Context)
    • 事务回调(Transaction Callback)
    • 事务执行器(Transaction Executor)
    • 以及负责与Alpha进行通讯的事务传输(Transaction Transport)模块。


    事务注解模块是分布式事务的用户界面,用户将这些标注添加到自己的业务代码之上用以描述与分布式事务相关的信息,这样Omega就可以按照分布式事务的协调要求进行相关的处理。如果大家扩展自己的分布式事务,也可以通过定义自己的事务标注来实现。



    事务拦截器这个模块我们可以借助AOP手段,在用户标注的代码基础上添加相关的拦截代码,获取到与分布式事务以及本地事务执行相关的信息,并借助事务传输模块与Alpha进行通讯传递事件。

    事务上下文为Omega内部提供了一个传递事务调用信息的一个手段,借助前面提到的全局事务ID以及本地事务ID的对应关系,Alpha可以很容易检索到与一个分布式事务相关的所有本地事务事件信息。



    事务执行器主要是为了处理事务调用超时设计的模块。由于Alpha与Omega之间的连接有可能不可靠,Alpha端很难判断Omega本地事务执行超时是由Alpha与Omega直接的网络引起的还是Omega自身调用的问题,因此设计了事务执行器来监控Omega的本地的执行情况,简化Omega的超时操作。目前Omega的缺省实现是直接调用事务方法,由Alpha的后台服务通过扫描事件表的方式来确定事务执行时间是否超时。


    事务回调在Omega与Alpha建立连接的时候就会向Alpha进行注册,当Alpha需要进行相关的协调操作的时候,会直接调用Omega注册的回调方法进行通信。 由于微服务实例在云化场景启停会很频繁,我们不能假设Alpha一直能找到原有注册上的事务回调, 因此我们建议微服务实例是无状态的,这样Alpha只需要根据服务名就能找到对应的Omega进行通信。


    事务传输模块负责Omega与Alpha之间的通讯,在具体的实现过程中,Pack通过定义相关的Grpc描述接口文件定义了TCC 以及Saga的事务交互方法, 同时也定义了与交互相关的事件。我们借助了Grpc所提供的双向流操作接口实现了Omega与Alpha之间的相互调用。 Omega和Alpha的传输建立在Grpc多语言支持的基础上,为实现多语言版本的Omega奠定了基础。


    Alpha为了实现其事务协调的功能,首先需要通过事务传输(Transaction Transport)接收Omega上传的事件, 并将事件存在事件存储(Event Store)模块中,Alpha通过事件API(Event API)对外提供事件查询服务。Alpha会通过事件扫描器(Event Scanner)对分布式事务的执行事件信息进行扫描分析,识别超时的事务,并向Omega发送相关的指令来完成事务协调的工作。由于Alpha协调是采用多个实例的方式对外提供高可用架构, 这就需要Alpha集群管理器(Alpha Cluster Manger)来管理Alpha集群实例之前的协调。用户可以通过管理终端(Manage console)对分布式事务的执行情况进行监控。



    目前Alpha的事件存储是构建在数据库基础之上的。为了降低系统实现的复杂程度,Alpha集群的高可用架构是建立在数据库集群基础之上的。 为了提高数据库的查询效率,我们会根据全局事务的执行情况将数据存储分成了在线库以及存档库,将未完成的分布式事务事件存储在在线库中, 将已经完成的分布式事务事件存储在存档库中。



    事件API是Alpha对外暴露的Restful事件查询服务。 该模块功能首先应用在Pack的验收测试中,通过事件API验收测试代码可以很方便的了解Alpha内部接收的事件。验收测试通过模拟各种分布式事务执行异常情况(错误或者超时),比对Alpha接收到的事务事件来验证相关的其他事务协调功能是否正确。



    管理终端是一个js的前端界面, 管理终端通过访问事件API提供的Rest服务,向用户提供分布式事务执行情况的统计分析,并且可以追踪单个全局事务的执行情况,找出事务的失败的根源。在Pack 0.3.0 中实现了一部分功能,后续还需要进一步完善,欢迎大家参与进来。



    Alpha集群管理器负责Alpha实例注册工作,管理Alpha中单个服务的执行情况, 并且为Omega提供一个及时更新的服务列表。 通过集群管理器用户可以轻松实现Alpha服务实例的启停操作,以及Alpha服务实例的滚动升级功能。目前这部分的模块还在设计开发中,欢迎对此有兴趣的朋友加入到我们的开发队伍中来。


    小结

    本文从分布式事务需要解决的问题入手,向大家介绍了建立在补充基础之上的基于服务的分布式事务的解决思路。接下来我们结合具体的示例介绍了完美的补偿(TCC)和非完美补偿(Saga)两种分布式事务协调协议,最后结合ServiceComb Pack的实现原理详细介绍了ServiceComb Pack的架构实现。



    在基于服务的分布式事务下篇中,我们将结合具体的示例向大家介绍TCC以及Saga分布式事务协调协议的交互细节,以及如何使用ServiceComb Pack编写TCC 以及Saga 应用。

    DataPipeline |《Apache Kafka实战》作者胡夕:Apache Kafka监控与调优

    数见科技 发表了文章 • 0 个评论 • 1230 次浏览 • 2018-09-04 18:36 • 来自相关话题

    胡夕,《Apache Kafka实战》作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM、搜狗、微博等公司。国内活跃的Kafka代码贡献者。 前言 虽然目前Apache Kafka已经全面进 ...查看全部
    胡夕,《Apache Kafka实战》作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM、搜狗、微博等公司。国内活跃的Kafka代码贡献者。

    前言

    虽然目前Apache Kafka已经全面进化成一个流处理平台,但大多数的用户依然使用的是其核心功能:消息队列。对于如何有效地监控和调优Kafka是一个大话题,很多用户都有这样的困扰,今天我们就来讨论一下。

    一、Kafka综述

    在讨论具体的监控与调优之前,我想用一张PPT图来简单说明一下当前Kafka生态系统的各个组件。就像我前面所说,Kafka目前已经进化成了一个流处理平台,除了核心的消息队列组件Kafka core之外,社区还新引入Kafka Connect和Kafka Streams两个新的组件:其中前者负责Kafka与外部系统的数据传输;后者则负责对数据进行实时流处理计算。下图罗列了一些关键的Kafka概念。


    幻灯片2.jpg



    二、Kafka监控

    我打算从五个维度来讨论Kafka的监控。首先是要监控Kafka集群所在的主机;第二是监控Kafka broker JVM的表现;第三点,我们要监控Kafka Broker的性能;第四,我们要监控Kafka客户端的性能。这里的所指的是广义的客户端——可能是指我们自己编写的生产者、消费者,也有可能是社区帮我们提供的生产者、消费者,比如说Connect的Sink/Source或Streams等;最后我们需要监控服务器之间的交互行为。


    幻灯片3.jpg



    1.主机监控

    个人认为对于主机的监控是最重要的。因为很多线上环境问题首先表现出来的症状就是主机的某些性能出现了明显的问题。此时通常是运维人员首先发现了它们然后告诉我们这台机器有什么问题,对于Kafka主机监控通常是发现问题的第一步。这一页列出了常见的指标,包括CPU、内存、带宽等数据。需要注意的是CPU使用率的统计。可能大家听过这样的提法:我的Kafka Broker CPU使用率是400%,怎么回事?对于这样的问题,我们首先要搞清楚这个使用率是怎么观测出来的? 很多人拿top命令中的vss或rss字段来表征CPU使用率,但实际上它们并不是真正的CPU使用率——那只是所有CPU共同作用于Kafka进程所花的时间片的比例。举个例子,如果机器上有16个CPU,那么只要这些值没有超过或接近1600, 那么你的CPU使用率实际上是很低的。因此要正确理解这些命令中各个字段的含义。

    这页PPT右边给出了一本书,如果大家想监控主机性能的话,我个人建议这本《SystemsPerformance》就足够了。非常权威的一本书,推荐大家读一下。


    幻灯片4.jpg



    2.监控JVM

    Kafka本身是一个普通的Java进程,所以任何适用于JVM监控的方法对于监控Kafka都是相通的。第一步就是要先了解Kafka应用。比方说了解Kafka broker JVM的GC频率和延时都是多少,每次GC后存活对象的大小是怎样的等。了解了这些信息我们才能明确后面调优的方向。当然,我们毕竟不是特别资深的JVM专家,因此也不必过多追求繁复的JVM监控与调优。只需要关注大的方面即可。另外,如果大家时间很有限但又想快速掌握JVM监控与调优,推荐阅读《Java Performance》。

    3.Per-Broker监控

    首先要确保Broker进程是启动状态?这听起来好像有点搞笑,但我的确遇到过这样的情况。比如当把Kafka部署在Docker上时就容易出现进程启动但服务没有成功启动的情形。正常启动下,一个Kafka服务器起来的时候,应该有两个端口,一个端口是9092常规端口,会建一个TCP链接。还有一个端口是给JMX监控用的。当然有多台broker的话,那么controller机器会为每台broker都维护一个TCP连接。在实际监控时可以有意识地验证这一点。

    对于Broker的监控,我们主要是通过JMS指标来做的。用过Kafka的人知道,Kafka社区提供了特别多的JMS指标,其中很多指标用处不大。我这里列了一些比较重要的:首先是broker机器每秒出入的字节数,就是类似于我可以监控网卡的流量,一定要把这个指标监控起来,并实时与你的网卡带宽进行比较——如果发现该值非常接近于带宽的话,就证明broker负载过高,要么增加新的broker机器,要么把该broker上的负载均衡到其他机器上。

    另外还有两个线程池空闲使用率小关注,最好确保它们的值都不要低于30%,否则说明Broker已经非常的繁忙。 此时需要调整线程池线程数。

    接下来是监控broker服务器的日志。日志中包含了非常丰富的信息。这里所说的日志不仅是broker服务器的日志,还包括Kafka controller的日志。我们需要经常性地查看日志中是否出现了OOM错误抑或是时刻关注日志中抛出的ERROR信息。

    我们还需要监控一些关键后台线程的运行状态。个人认为有两个比较重要的线程需要监控:一个Log Cleaner线程——该线程是执行数据压实操作的,如果该线程出问题了,用户通常无法感知到,然后会发现所有compact策略的topic会越来越大直到占满所有磁盘空间;另一个线程就是副本拉取线程,即follower broker使用该线程实时从leader处拉取数据。如果该线程“挂掉”了,用户通常也是不知道的,但会发现follower不再拉取数据了。因此我们一定要定期地查看这两个线程的状态,如果发现它们意味终止,则去找日志中寻找对应的报错信息。


    幻灯片6.jpg



    4.Clients监控

    客户端监控这块,我这边会分为两个,分别讨论对生产者和消费者的监控。生产者往Kafka发消息,在监控之前我们至少要了解一下客户端机器与Broker端机器之间的RTT是多少。对于那种跨数据中心或者是异地的情况来说,RTT本来就很大,如果不做特殊的调优,是不可能有太高的TPS的。目前Kafka producer是双线程的设计机制,分为用户主线程和Sender线程,当这个Sender线程挂了的时候,前端用户是不感知的,但表现为producer发送消息失败,所以用户最好监控一下这个Sender线程的状态。


    幻灯片7.jpg



    还有就是监控PRODUCE请求的处理延时。一条消息从生产者端发送到Kafka broker进行处理,之后返回给producer的总时间。整个链路中各个环节的耗时最好要做到心中有数。因为很多情况下,如果你要提升生产者的TPS,了解整个链路中的瓶颈后才能做到有的放矢。后面PPT中我会讨论如何拆解这条链路。

    现在说说消费者。这里的消费者说的是新版本的消费者,也就是java consumer。


    幻灯片8.jpg



    社区已经非常不推荐再继续使用老版本的消费者了。新版本的消费者也是双线程设计,后面有一个心跳线程,如果这个线程挂掉的话,前台线程是不知情的。所以,用户最好定期监控该心跳线程的存活情况。心跳线程定期发心跳请求给Kafka服务器,告诉Kafka,这个消费者实例还活着,以避免coordinator错误地认为此实例已“死掉”从而开启rebalance。Kafka提供了很多的JMX指标可以用于监控消费者,最重要的消费进度滞后监控,也就是所谓的consumerlag。

    假设producer生产了100条消息,消费者读取了80条,那么lag就是20。显然落后的越少越好,这表明消费者非常及时,用户也可以用工具行命令来查lag,甚至写Java的API来查。与lag对应的还有一个lead指标,它表征的是消费者领先第一条消息的进度。比如最早的消费位移是1,如果消费者当前消费的消息是10,那么lead就是9。对于lead而言越大越好,否则表明此消费者可能处于停顿状态或者消费的非常慢,本质上lead和lag是一回事,之所以列出来是因为lead指标是我开发的,也算打个广告吧。

    除了以上这些,我们还需要监控消费者组的分区分配情况,避免出现某个实例被分配了过多的分区,导致负载严重不平衡的情况出现。一般来说,如果组内所有消费者订阅的是相同的主题,那么通常不会出现明显的分配倾斜。一旦各个实例订阅的主题不相同且每个主题分区数参差不齐时就极易发生这种不平衡的情况。Kafka目前提供了3种策略来帮助用户完成分区分配,最新的策略是黏性分配策略,它能保证绝对的公平,大家可以去试一下。

    最后就是要监控rebalance的时间——目前来看,组内超多实例的rebalance性能很差,可能都是小时级别的。而且比较悲剧的是当前无较好的解决方案。所以,如果你的Consumer特别特别多的话,一定会有这个问题,你监控一下两个步骤所用的时间,看看是否满足需求,如果不能满足的话,看看能不能把消费者去除,尽量减少消费者数量。

    5.Inter-Broker监控

    最后一个维度就是监控Broker之间的表现,主要是指副本拉取。Follower副本实时拉取leader处的数据,我们自然希望这个拉取过程越快越好。Kafka提供了一个特别重要的JMX指标,叫做备份不足的分区数,比如说我规定了这条消息,应该在三个Broker上面保存,假设只有一个或者两个Broker上保存该消息,那么这条消息所在的分区就被称为“备份不足”的分区。这种情况是特别关注的,因为有可能造成数据的丢失。《Kafka权威指南》一书中是这样说的:如果你只能监控一个Kafka JMX指标,那么就监控这个好了,确保在你的Kafka集群中该值是永远是0。一旦出现大于0的情形赶紧处理。


    幻灯片9.jpg



    还有一个比较重要的指标是表征controller个数的。整个集群中应该确保只能有一台机器的指标是1,其他全应该是0,如果你发现有一台机器是2或者是3,一定是出现脑裂了,此时应该去检查下是否出现了网络分区。Kafka本身是不能对抗脑裂的,完全依靠Zookeeper来做,但是如果真正出现网络分区的话,也是没有办法处理的,不如赶快fail fast掉。

    三、监控工具

    当前没有一款Kafka监控工具是公认比较优秀的,每个都有自己的特点但也有些致命的缺陷。我们针对一些常见的监控工具逐个讨论下。

    1.Kafka Manager

    应该说在所有免费的监控框架中,Kafka Manager是最受欢迎的。它最早由雅虎开源,功能非常齐全,展示的数据非常丰富。另外,用户能够在界面上执行一些简单的集群管理操作。更加令人欣慰的是,该框架目前还在不断维护中,因此使用Kafka manager来监控Kafka是一个不错的选择。

    2.Burrow

    Burrow是去年下半年开源,专门监控消费者信息的框架。这个框架刚开始开源的时候,我还对它还是寄予厚望的,毕竟是Kafka社区committer亲自编写的。不过Burrow的问题在于没有UI界面,不方便运维操作。另外由于是Go语言写的,你要用的话,必须搭建Go语言环境,然后编译部署,总之用起来不是很方便。还有就是它的更新不是很频繁,已经有点半荒废的状态,大家不妨一试。

    3.Kafka Monitor

    严格来说,它不是监控工具,它是专门做Kafka集群系统性测试用的。待监控的指标可以由用户自己设定,主要是做一些端到端的测试。比如说你搭了一套Kafka集群,想测试端到端的性能怎样:从发消息到消费者读取消息这一整体流程的性能。该框架的优势也是由Kafka社区团队写的,质量有保障,但更新不是很频繁,目前好像几个月没有更新了。

    4.Kafka Offset Monitor

    KafkaOffsetMonitor是我用的最早的一个Kafka监控工具,也是监控消费者位移,只不过那时候Kafka把位移保持在Zookeepr上。这个框架的界面非常漂亮,国内用的人很多。但是现在有一个问题,因为我们现在用了新版本的消费者,这个框架目前支持得的并不是特别好。而且还有一个问题就是它已经不再维护了,可能有1-2年没有任何更新了。

    5.Kafka Eagle

    这是国人自己开发的,我不知道具体是哪个大牛开发的,但是在Kafka QQ群里面很多人推崇,因为界面很干净漂亮,上面有很好的数据展现。

    6.Confluent Control Center

    Control Center是目前我能收集到的功能最齐全的Kafka监控框架了,只不过只有购买了Confluent企业版也有的,也就是说是付费的。

    综合来讲,如果你是Kafka集群运维操作人员,推荐先用Kafka Manager来做监控,后面再根据实际监控需求定制化开发特有的工具或框架。

    四、系统调优

    Kafka监控的一个主要的目的就是调优Kafka集群。这里罗列了一些常见的操作系统级的调优。

    首先是保证页缓存的大小——至少要设置页缓存为一个日志段的大小。我们知道Kafka大量使用页缓存,只要保证页缓存足够大,那么消费者读取消息时就有大概率保证它能够直接命中页缓存中的数据而无需从底层磁盘中读取。故只要保证页缓存要满足一个日志段的大小。

    第二是调优文件打开数。很多人对这个资源有点畏手畏脚。实际上这是一个很廉价的资源,设置一个比较大的初始值通常都是没有什么问题的。

    第三是调优vm.max_map_count参数。主要适用于Kafka broker上的主题数超多的情况。Kafka日志段的索引文件是用映射文件的机制来做的,故如果有超多日志段的话,这种索引文件数必然是很多的,极易打爆这个资源限制,所以对于这种情况一般要适当调大这个参数。

    第四是swap的设置。很多文章说把这个值设为0,就是完全禁止swap,我个人不建议这样,因为当你设置成为0的时候,一旦你的内存耗尽了,Linux会自动开启OOM killer然后随机找一个进程杀掉。这并不是我们希望的处理结果。相反,我建议设置该值为一个比较接近零的较小值,这样当我的内存快要耗尽的时候会尝试开启一小部分swap,虽然会导致broker变得非常慢,但至少给了用户发现问题并处理之的机会。

    第五JVM堆大小。首先鉴于目前Kafka新版本已经不支持Java7了,而Java 8本身不更新了,甚至Java9其实都不做了,直接做Java10了,所以我建议Kafka至少搭配Java8来搭建。至于堆的大小,个人认为6-10G足矣。如果出现了堆溢出,就提jira给社区,让他们看到底是怎样的问题。因为这种情况下即使用户调大heap size,也只是延缓OOM而已,不太可能从根本上解决问题。


    幻灯片18.jpg



    最后,建议使用专属的多块磁盘来搭建Kafka集群。自1.1版本起Kafka正式支持JBOD,因此没必要在底层再使用一套RAID了。

    五、Kafka调优的四个层面

    Kafka调优通常可以从4个维度展开,分别是吞吐量、延迟、持久性和可用性。在具体展开这些方面之前,我想先建议用户保证客户端与服务器端版本一致。如果版本不一致,就会出现向下转化的问题。举个例子,服务器端保存高版本的消息,当低版本消费者请求数据时,服务器端就要做转化,先把高版本消息转成低版本再发送给消费者。这件事情本身就非常非常低效。很多文章都讨论过Kafka速度快的原因,其中就谈到了零拷贝技术——即数据不需要在页缓存和堆缓存中来回拷贝。

    简单来说producer把生产的消息放到页缓存上,如果两边版本一致,可以直接把此消息推给Consumer,或者Consumer直接拉取,这个过程是不需要把消息再放到堆缓存。但是你要做向下转化或者版本不一致的话,就要额外把数据再堆上,然后再放回到Consumer上,速度特别慢。

    1.Kafka调优 – 吞吐量

    调优吞吐量就是我们想用更短的时间做更多的事情。这里列出了客户端需要调整的参数。前面说过了producer是把消息放在缓存区,后端Sender线程从缓存区拿出来发到broker。这里面涉及到一个打包的过程,它是批处理的操作,不是一条一条发送的。因此这个包的大小就和TPS息息相关。通常情况下调大这个值都会让TPS提升,但是也不会无限制的增加。不过调高此值的劣处在于消息延迟的增加。除了调整batch.size,设置压缩也可以提升TPS,它能够减少网络传输IO。当前Lz4的压缩效果是最好的,如果客户端机器CPU资源很充足那么建议开启压缩。


    幻灯片21.jpg



    对于消费者端而言,调优TPS并没有太好的办法,能够想到的就是调整fetch.min.bytes。适当地增加该参数的值能够提升consumer端的TPS。对于Broker端而言,通常的瓶颈在于副本拉取消息时间过长,因此可以适当地增加num.replica.fetcher值,利用多个线程同时拉取数据,可以加快这一进程。

    2.Kafka调优 – 延时

    所谓的延时就是指消息被处理的时间。某些情况下我们自然是希望越快越好。针对这方面的调优,consumer端能做的不多,简单保持fetch.min.bytes默认值即可,这样可以保证consumer能够立即返回读取到的数据。讲到这里,可能有人会有这样的疑问:TPS和延时不是一回事吗?假设发一条消息延时是2ms,TPS自然就是500了,因为一秒只能发500消息,其实这两者关系并不是简单的。因为我发一条消息2毫秒,但是如果把消息缓存起来统一发,TPS会提升很多。假设发一条消息依然是2ms,但是我先等8毫秒,在这8毫秒之内可能能收集到一万条消息,然后我再发。相当于你在10毫秒内发了一万条消息,大家可以算一下TPS是多少。事实上,Kafka producer在设计上就是这样的实现原理。

    3.Kafka调优 –消息持久性

    消息持久化本质上就是消息不丢失。Kafka对消息不丢失的承诺是有条件的。以前碰到很多人说我给Kafka发消息,发送失败,消息丢失了,怎么办?严格来说Kafka不认为这种情况属于消息丢失,因为此时消息没有放到Kafka里面。Kafka只对已经提交的消息做有条件的不丢失保障。

    如果要调优持久性,对于producer而言,首先要设置重试以防止因为网络出现瞬时抖动造成消息发送失败。一旦开启了重试,还需要防止乱序的问题。比如说我发送消息1与2,消息2发送成功,消息1发送失败重试,这样消息1就在消息2之后进入Kafka,也就是造成乱序了。如果用户不允许出现这样的情况,那么还需要显式地设置max.in.flight.requests.per.connection为1。


    幻灯片24.jpg



    本页PPT列出的其他参数都是很常规的参数,比如unclean.leader.election.enable参数,最好还是将其设置成false,即不允许“脏”副本被选举为leader。

    4.Kafka调优 –可用性

    最后是可用性,与刚才的持久性是相反的,我允许消息丢失,只要保证系统高可用性即可。因此我需要把consumer心跳超时设置为一个比较小的值,如果给定时间内消费者没有处理完消息,该实例可能就被踢出消费者组。我想要其他消费者更快地知道这个决定,因此调小这个参数的值。

    六、定位性能瓶颈

    下面就是性能瓶颈,严格来说这不是调优,这是解决性能问题。对于生产者来说,如果要定位发送消息的瓶颈很慢,我们需要拆解发送过程中的各个步骤。就像这张图表示的那样,消息的发送共有6步。第一步就是生产者把消息放到Broker,第二、三步就是Broker把消息拿到之后,写到本地磁盘上,第四步是follower broker从Leader拉取消息,第五步是创建response;第六步是发送回去,告诉我已经处理完了。


    幻灯片26.jpg



    这六步当中你需要确定瓶颈在哪?怎么确定?——通过不同的JMX指标。比如说步骤1是慢的,可能你经常碰到超时,你如果在日志里面经常碰到request timeout,就表示1是很慢的,此时要适当增加超时的时间。如果2、3慢的情况下,则可能体现在磁盘IO非常高,导致往磁盘上写数据非常慢。倘若是步骤4慢的话,查看名为remote-time的JMX指标,此时可以增加fetcher线程的数量。如果5慢的话,表现为response在队列导致待的时间过长,这时可以增加网络线程池的大小。6与1是一样的,如果你发现1、6经常出问题的话,查一下你的网络。所以,就这样来分解整个的耗时。这是到底哪一步的瓶颈在哪,需要看看什么样的指标,做怎样的调优。

    七、Java Consumer的调优

    最后说一下Consumer的调优。目前消费者有两种使用方式,一种是同一个线程里面就直接处理,另一种是我采用单独的线程,consumer线程只是做获取消息,消息真正的处理逻辑放到单独的线程池中做。这两种方式有不同的使用场景:第一种方法实现较简单,因为你的消息处理逻辑直接写在一个线程里面就可以了,但是它的缺陷在于TPS可能不会很高,特别是当你的客户端的机器非常强的时候,你用单线程处理的时候是很慢的,因为你没有充分利用线程上的CPU资源。第二种方法的优势是能够充分利用底层服务器的硬件资源,TPS可以做的很高,但是处理提交位移将会很难。

    最后说一下参数,也是网上问的最多的,这几个参数到底是做什么的。第一个参数,就是控制consumer单次处理消息的最大时间。比如说设定的是600s,那么consumer给你10分钟来处理。如果10分钟内consumer无法处理完成,那么coordinator就会认为此consumer已死,从而开启rebalance。

    Coordinator是用来管理消费者组的协调者,协调者如何在有效的时间内,把消费者实例挂掉的消息传递给其他消费者,就靠心跳请求,因此可以设置heartbeat.interval.ms为一个较小的值,比如5s。

    八、Q & A

    Q1:胡老师在前面提到低版本与高版本有一个端口的问题,我想问一下高版本的、低版本的会有这个问题吗?

    A1:会有。

    Q2:两种模式,一个是Consumer怎么做到所有的partition,在里面做管理的。会有一个问题,某个Consumer的消费比较慢,因为所有的Partition的消费都是绑定在一个线程。一个消费比较慢,一个消费比较快,要等另一个。有没有一种方案,消费者比较慢的可以暂定,如果涉及到暂停的话,频繁的暂定耗费的时间,是不是会比较慢?

    A2:一个线程处理所有的分区。如果从开销来讲并不大,但是的确会出现像你说的,如果一个消费者定了100个分区,目前我这边看到的效果,某段时间内有可能会造成某些分区的饿死,比如说某些分区长期得不到数据,可能有一些分区不停的有数据,这种情况下的确有可能情况。但是你说的两种方法本身开销不是很大,因为它就是内存当中的结构变更,就是定位信息,如果segment,就把定位信息先暂时关掉,不涉及到很复杂的数据结构的变更。

    Q3:怎么决定顺序呢?

    A3:这个事情现在在Broker端做的,简单会做轮询,比如说有100个分区,第一批随机给你一批分区,之后这些分区会排到整个队列的末尾,从其他的分区开始给你,做到尽量的公平。

    Q4:消费的时候会出现数据倾斜的情况,这块如何理解?

    A4:数据倾斜。这种情况下发生在每个消费者订阅信息不一样的情况下,特别容易出现数据倾斜。比如说我订阅主题123,我订阅主题456,我们又在同一个组里面这些主题分区数极不相同,很有可能出现我订阅了10个分区,你可能订阅2个分区。如果你用的是有粘性的分配策略,那种保证不会出现超过两个以上相差的情况。这个策略推出的时间也不算短了,是0.11版本推出来的。

    点击这里,免费申请DataPipeline产品试用

    谈谈Apache Mesos和Mesosphere DCOS:历史、架构、发展和应用

    colstuwjx 发表了文章 • 1 个评论 • 15886 次浏览 • 2015-09-20 07:59 • 来自相关话题

    【编者的话】Mesos 是一个很年轻的开源项目,它的理念是怎样的? 它的整体架构以及服务对象又是什么? 基于此的 Mesosphere DCOS 又是如何定位的? 本文作者就这些话题展开了探讨。 ## Mesos 发展史 Mesos 是 ...查看全部
    【编者的话】Mesos 是一个很年轻的开源项目,它的理念是怎样的? 它的整体架构以及服务对象又是什么? 基于此的 Mesosphere DCOS 又是如何定位的? 本文作者就这些话题展开了探讨。
    ## Mesos 发展史
    Mesos 是一个早在2009年由 Benjamin Hindman、Andy Konwinski、Matei Zaharia、Ali Ghodsi、Anthony D. Joseph、Randy Katz、Scott Shenker和Ion Stoica几人联合发起的伯克利大学研究项目。Benjamin 随后将其引入 Twitter,而如今它已经完美的运行在他们的数据中心上, Benjamin 本人也在不久之后成为了 Mesosphere 的首席架构师,正是它构建了 Mesosphere 数据中心操作系统(DCOS)。

    Mesos 的设计宗旨在于尝试和提高集群的利用效率和性能,他们认为对于数据中心资源的单纯静态划分和使用的这样一个方式是值得重新考量的,举个例子来说:

    我们假设你的数据中心里拥有9个主机:
    1.png

    如果把它静态的划分开来,并且指定每三个主机承载一个应用,这样一来总共是3个应用(这里是Hadoop、Spark 和 Ruby on Rails)。
    2.png

    显而易见的一个问题是这些主机的资源利用率并不会很高;
    3.png

    因此如果你想使用全部的资源,即这里例子中的全部9台主机,那么就需要将其抽象成一个共享资源池,而你可以按需计划配置,这样的话,利用率自然可以得到相应的提升;
    4.png


    Mesos团队的第二个观点在于他们觉得需要为分布式系统量身定制一套新的系统,换句话说,他们觉得MapReduce并不是适用于所有的场景(这也导致了Spark的诞生,而它又是另外一个故事了),而我们需要一个新的更简单和更具有通用性的专为分布式系统提供服务的这样一个框架。
    ## Mesos 框架(分布式系统)到底是什么?
    一般来说,一个分布式系统你需要有一个Coordinator(调度器)和 多个Worker(执行任务)。调度器以同步(分布式)的方式运行进程/任务,处理程序错误(容错),并且负责优化性能(即弹性伸缩)。换句话说,它负责协调在数据中心去实际执行你想要运行的代码(不需要是一个完整的程序,它也可以是某些种类的运算)。正如之前所提到的那样,Mesos将其称之为联合调度。
    5.png

    或者也可以这么说,Mesos是一个带有调度器的分布式系统。
    6.png

    那么Mesos的真正定位是什么呢? 当你尝试去执行它的任务时你可以理解为它实际上就是机器和调度器之间的一层抽象。

    因此在Mesos里,调度器是和Mesos层(通过API等)通信,而不是直接跟物理机器打交道。Mesos这里通过这样的方式尝试解决的即是资源的静态划分问题,这意味着你不再需要针对每个特定的运行时分配一个对应的调度器去决定实际去执行它的workers,而取而代之的是,你有一个调度器去和Mesos通信,而它会反过来依据整个资源池的剩余资源做调度。
    7.png

    这样做带来的最显而易见的好处就是你可以在一批机器上运行多个不同的分布式系统并且更有效的(不再是静态划分)动态划分和共享这些资源。
    8.png

    其次,之所以这样抽象设计的另外一个重要原因在于它能够提供一个通用功能集(故障检测、分布式任务、任务启动、任务监控、结束任务、清理任务等),这样一来就无需每个分布式系统都各自重复的去实现这样一套逻辑。
    ## Mesos 适合作为数据中心的哪一层的抽象?
    Mesos 这一层抽象实现的目的即是想要尝试通过使用并更好的调度资源使得运行在其之上的这些框架变得更加易于构建和运行。
    9.png

    IaaS的抽象的是机器,例如你给它指定一个数字,它便会生成一堆的机器而这也可以看作是Mesos概念模型更底层化的一个抽象。PaaS则考虑的更多是部署和管理应用/服务,它并不关心底层的那些基础架构,而你可以把它看作是Mesos概念模型的一个更高层面的抽象。在交互方面,PaaS可能是和开发者直接交互,而Mesos则是以API的形式和软件程序交互。

    换句话说,你可以基于Mesos之上构建一个Paas系统(例如像Marathon - 它好像任何地方都比一个真正的Paas系统更像PaaS),同时你可以在一个IaaS上运行Mesos(例如OpenStack)。

    如果你将你的Mesos运行在一个组合系统(例如就像Openstack + 物理硬件 + 虚拟机)之上,那么你可以很直观的再次体会到动态划分资源的好处,那便是你能够跨越这些底层组件而直接的去管理和计划你的工作负载,某种意义上来说,你可以认为Mesos类似于是一个数据中心的内核,即它负责将物理机器抽象成资源,从而使得你能够忽略底层组件的存在,通过消费Mesos的抽象资源来构建分布式系统。

    因此我们可以说,Apache Mesos是为构建和运行其它分布式系统(例如像Spark)提供服务的分布式系统。
    ## Mesos架构内幕
    在 Mesos 里,一个框架程序(或者说分布式系统)发起的一次请求会在被接收到的那个时刻由调度器承接和分配。这跟传统分布式系统一般人为发起请求的方式不太一样(再强调一下,Mesos将会让框架程序发起请求,而不是人工操作),传统的方式即需要在人为发起请求时设定好需要分配的特定资源,然后再去真正请求和获取这些资源,这类情况中最典型的莫过于需求场景的变换(设想在Map/Reduce的场景下,比如在Map和Reduce阶段切换之际产生的一个需求资源的变化)
    10.png

    与传统分布式系统不一样的是,Mesos 将会立马为其分配所能分配的最大资源,而不是傻傻的在那等到满足该请求的资源完成/完全到位(在这里它想要实现的便是在绝大多数情况下十分奏效的无阻塞式资源分配策略,即你无须立马消费预期请求的全量资源的这样的情景)。

    当然,现在框架类应用(分布式系统)也可以使用Mesos提供的资源完成他们自己的调度,这便是所谓的 “二次资源调度”。
    11.png

    最终达到的效果即是你下发的一个任务可以在整个数据中心的任意一个地方提交并且运行。

    构建这样的“二次资源调度”系统的原因在于它可以在同一时间内支持多个分布式系统。同样以上面的例子来解释,Mesos为Spark提供和分配所需的资源。而这里,Spark则负责决策和分配这些可用资源去运行实际任务(即因为可用的资源得以满足需求,所以我才能够实际去运行这些map任务)。
    12.png

    所以一旦一个任务被框架应用提交到Mesos,那么这些任务就必须被实际执行。Mesos master 负责指派任务给每个slave,而每个slave通过上面跑着的agent来管理和运行这些任务。(这即是说如果这个任务是对应的一个命令,那么它会去执行它,如果它需要一些特定的资源来完成这个任务,比如像jar包,那么它会先获取所需的资源,然后在一个沙盒里执行它,最后才发起这个任务)

    或者说你也可以这样,框架应用可以通过一个执行器(框架应用需要一个中间层,这个中间层可以用来多线程执行任务)来灵活的决定它想要执行的任务。
    13.png

    为了保证资源的相对隔离性,Mesos 对 Kernel的cgroups和namespaces 提供了内置的原生支持,当然你也可以将一个Docker容器当做一个任务去运行。这样一来,它便给你提供了一个多租户的(框架)资源池的访问机制(跨主机和主机内部的进程间通信)。

    你可以预请求你所需的资源,当然这样你也就回到了资源固定划分的时代。如果你有一些有状态的应用,那么你需要预定一些资源(这类任务通常需要在同一台主机上运行)并且需要一些持久化的存储卷(数据需要能够支持故障迁移和恢复),而这类需求Mesos同样能够支持。
    ## Mesosphere DCOS
    DCOS(数据中心操作系统)即是Mesos的“核心”与其周边的服务及功能组件所组成的一个生态系统。例如像mesos-dns这样的插件模块,类似一个CLI,一个GUI又或者是提供你想运行的所有的包的仓库等工具,以及像Marathon(又名分布式的init)、Chronos(又名分布式的cron)这样的框架等等。

    顾名思义,它即是意味着一个跨越在数据中心或者云环境所有主机之上的操作系统。DCOS 可以运行在任意的现代Linux环境,公有或私有云,虚拟机甚至是裸机环境。(当前所支持的平台有:亚马逊AWS、谷歌GCE、微软Azure、OpenStack、Vmware、RedHat、CentOS、CoreOS以及Ubuntu)。迄今为止,DCOS 在其公有仓库上已经提供了多达40余种服务组件(Hadoop、Spark、Cassandra、Jenkins、Kafka、MemSQL等等)。

    另附Mesosphere 集群操作系统(DCOS)入门视频

    原文链接:Introduction to Apache Mesos and Mesosphere DCOS (翻译:吴佳兴)

    Fenzo:来自Netflix基于Java语言的Mesos调度器

    edge_dawn 发表了文章 • 0 个评论 • 8658 次浏览 • 2015-09-02 11:55 • 来自相关话题

    【编者的话】Fenzo是一个在Mesos框架上应用的通用任务调度器。它可以让你通过实现各种优化策略的插件,来优化任务调度,同时这也有利于集群的自动缩放。 Netflix有着数百万的用户,要为这个数量级的用户提供可靠的服务并不是一件容易 ...查看全部
    【编者的话】Fenzo是一个在Mesos框架上应用的通用任务调度器。它可以让你通过实现各种优化策略的插件,来优化任务调度,同时这也有利于集群的自动缩放。

    Netflix有着数百万的用户,要为这个数量级的用户提供可靠的服务并不是一件容易的事情。Netflix是由几十个分布式的服务支撑的,其中每个服务都是产品不可或缺的一部分,并且都在不断迭代着。我们需要从两个方面来优化这些服务,一个是用户体验,另外一个是服务的整体性能以及成本。为此,我们很高兴向大家介绍Fenzo这款开源软件,它是一个使用Java语言编写的Apache Mesos框架的调度器。Fenzo负责管理Netflix内部所有服务的调度和资源分配。

    Fenzo现在已经开源,读者可以在GitHub中了解关于它的更多信息。
    ##为什么使用Fenzo?
    之所以要重新开发一个新框架,而不是利用社区中已有的框架,是因为我们考虑到两个方面,一是调度优化,另一个是希望能够根据资源使用情况来自动缩放集群,这两个方面下文中都将会详细解释。Fenzo更适合管理生命周期短暂(ephemerality)的应用,Netflix的用例包括实时操作的响应式数据流系统以及管理基于容器的应用部署。

    在Netflix中,一天业务数据的变化非常大,如果按照业务峰值时所需要的资源来配置集群资源,那将会非常浪费,并且当出现某些热点事件时,我们的系统是无法应对这样的突发情况的。我们需要利用云的弹性以及基于动态负载来缩放集群。

    虽然扩大集群似乎看起来相对比较容易,但是当集群中可用资源低于某一个阈值时,缩小集群就会带来新的挑战。当存在长期运行的任务,并且不能随便被终止时,例如拓扑结构的状态流处理耗时重构,那么调度器如果想让集群缩小,就必须让这样的主机上的所有任务几乎同时终止。
    ##调度策略
    任务调度需要优化资源分配以最大化预期目标。不同的资源分配方式会对结果产生不同的影响, 包括可伸缩性、性能等方面,因此,高效的资源分配方式对于调度管理器来说是至关重要的。比如,选择分配方式时,逐个评估所有的可用资源以及任务,这在计算方面根本吃不消。
    ##调度模型
    我们的设计专注于大规模部署具有多重约束和优化资源需求的多样化的任务与资源。如果评估最优化分配需要很长时间,就可能造成两个问题:

    • 资源闲置,等待新的任务
    • 任务启动时间增加
    Fenzo采用了能够快速推动我们到正确方向的方式,而不是每次都找出最优的调度分配集。从概念上讲,我们认为任务有一个紧迫因素决定多久需要一个任务分配,以及一个适合度因素决定是否适合一个给定的主机。
    FitnessUrgency.png
    如果任务是非常紧迫的,或者如果它非常适合于一个给定的资源,我们继续并分配资源给这个任务。 否则,我们继续让任务挂起,直到紧迫性增加或发现另一台主机具有较大的适合度。##权衡调度速度与优化Fenzo能够为你动态的选择速度与最优分配。它跨多个主机采用一个评价最优分配策略,但是只有当适合度被认为是“足够好”才能获得这种策略。然而用户为足够好的合适度定义了阈值以控制调度速度,用一个合适度评估插件来表示集群任务分配的最优化和最高级别的调度对象。这个合适度计算器由多个其他合适度计算器组成,代表一个多重面向对象。##任务约束Fenzo任务使用可选的软或硬约束影响分配来实现与其他任务的locality和/或资源的亲和力。软约束满足best efforts基准,结合合适度计算器来给可能分配的主机打分,而硬约束则必须满足和充当一个资源选择过滤器。Fenzo把所有相关的集群状态信息提供给适合度计算器和约束插件,这样就可以优化基于作业、资源和时间的各方面的任务。##封装和约束插件Fenzo目前为封装提供了内置的基于CPU、内存以及网络带宽资源或者是他们集合的适合度计算器。一些内置的约束用于解决资源类型的常见位置公共用例,将一组任务分配给不同的主机,平衡跨越给定主机属性的任务,例如可用区、主机位置等。你可以通过提供的新插件定制合适度计算器和约束。##集群自动缩放Fenzo支持使用两种互补的策略集群自动缩放:
    • 基于阈值
    • 基于资源短缺分析

    基于阈值的自动缩放,用户可以指定每个被用在集群当中的host组(如EC2自动缩放组,ASG)。例如,有可能是使用一个EC2实例类型的计算密集型工作负载创建一个ASG,也可以使用网络密集型工作负载创建另一个ASG。每一条规则有助于保持配置可用于快速启动新作业的一定数量的可用主机。

    利用资源短缺分析试图来估计主机数目,以满足待处理负载。这补充了在需求激增当中基于集群扩大的规则。 Fenzo的自动缩放还补充了预测自动缩放系统,如Netflix Scryer。
    ##在Netflix上的应用
    在Netflix上,Fenzo目前被使用在2个Mesos框架中,用于各种使用案例,包括长时间运行的服务和批处理作业。我们已经看到调度器在多重约束和自定义的适合度计算器的情况下分配资源比较快。此外,Fenzo允许我们根据当前的需求,而不是按照需求的峰值集群规模来调整集群大小。

    下表显示了我们观测到的在我们其中的一个集群中的每个调度运行平均时间和最大时间。每个调度运行可能会试图分配资源给多个任务,而运行时间非常依赖于需要分配的任务数、约束的数量和种类以及从中选择资源的主机数量。
    QQ图片20150902114723.png

    下图显示了在集群中几天内Mesos slave的数量变化,作为Fenzo的自动缩放行为的体现,表示3X在最大和最小数值上的不同。
    TitanAutoscaling2.png

    ##Fenzo 在Mesos 框架上的使用
    FenzoUsageDiagram.png

    上面简易的图示告诉我们Fenzo怎样被Apache Mesos框架使用。Fenzo任务调度提供了一个没有与Mesos自身进行交互的调度核心。Mesos的框架和接口在新的资源和任务状态更新上得到回传,同时它让Mesos driver 启动基于Fenzo的分配任务。
    ##总结
    Fenzo已经成为云平台上的一个很好的帮手,它在Mesos上给我们一个高级别的控制任务调度,而且使我们在机器效率与作业运行快速化方面达到一个平衡。除此之外,Fenzo支持集群的自动缩放和封装。通过编写你自己的插件可以实现自定义调度器。

    源代码在Netflix GitHub上可以找到,资源库当中包含了样本框架教大家如何使用Fenzo,而且在JUnit tests中给出了不同类型的例子包括写自定义的适应度计算器和约束条件。Fenzo wiki包含详细的文档来帮助大家开始学习Fenzo。

    原文链接: Fenzo: OSS Scheduler for Apache Mesos Frameworks(翻译:edge_dawn)

    在Docker上运行Apache Kafka

    Azriel 发表了文章 • 1 个评论 • 23539 次浏览 • 2015-08-05 23:28 • 来自相关话题

    【编者的话】在研究Apache Kafka和Docker时,作者发现Docker是一个非常神奇的技术,它将开发过程简化的如此完美。又因为有wurstmeister/kafka和wurstmeister/zookeeper这两个镜像,运行Apache Kafka ...查看全部
    【编者的话】在研究Apache Kafka和Docker时,作者发现Docker是一个非常神奇的技术,它将开发过程简化的如此完美。又因为有wurstmeister/kafka和wurstmeister/zookeeper这两个镜像,运行Apache Kafka和使用Docker是那样的轻松,还有比这更让人觉得兴奋的事情么!让我们完全从安装、维护机器和软件中解脱出来。

    一直很想鼓捣Apache Kafka,但由于我想鼓捣的事太多,Kafka一直没能得到临幸。直到最近,有人要我尝试下这个“中间人”,看看这东西是否能满足一个项目的需求----其实是两个项目。可以想下我当时的表情。

    我编译了Apache Kafka的源代码,将其连接到了Spark Streaming并尝试回答StackOverflow上的一些问题(在使用Scala的Flink中怎样使用Kafka?怎样用jmxtrans见识Kafka中间人?),更不用说阅读繁多的相关文章和看视频。我对什么场景最适合用Apache Kafka有了清醒地认识。

    在和Codilime里的团队开发DeepSense.io平台时,我们只用Ansible自动化部署。我们也尝试过DockerVagrant,都是为了简化DeepSense.io的部署。

    这时就涉及到了两个需求:为了三个项目而研究Apache Kafka和Docker(包括其他工具)!很神奇,不是么?我终于发现了Docker可以让开发产品和部署变得多么简单。只有亲眼所见时,我才意识到Docker竟能如此简化我的开发过程。现在我会把一切Docker化。得知wurstmeister/kafkawurstmeister/zookeeper镜像时,我不可能更幸福了。运行Apache Kafka和使用Docker最终变得如此轻松愉悦。

    然后我就想我应该分享这份热爱,不仅我,所有人都可以从中受益。

    由于我在Mac OS X上,所以用Docker运行Apache Kafka的步骤依赖于boot2docker----一款轻量级Linux,供一些不原生支持Docker的平台,比如前面提到的Mac OS X以及Windows。

    你将会用到wurstmeister/kafkawurstmeister/zookeeper两个镜像。

    你可以在后台或前台独立于镜像运行容器。视你的Unix技术而言,也就是一两个终端的事。这里就对Apache Kafka和Apache Zookeeper各使用一个终端。我会在另一篇博客中解释Apache Zookeeper的作用。

    下面是用Docker运行Apache Kafka的步骤,假设你已经装好了``boot2docker``和``docker``。
    ➜  ~  boot2docker version
    Boot2Docker-cli version: v1.7.1
    Git commit: 8fdc6f5

    ➜ ~ docker --version
    Docker version 1.7.1, build 786b29d

    我很喜欢homebrew,强烈推荐给Mac OS X的用户。很多程序包只需``brew install``一下就能用,包括``docker``和``boot2docker``。
    # 在两个镜像上运行Kafka
    1,(仅适用Mac OS X和Windows用户)执行``boot2docker up``在Mac OS上启动微型Linux内核。
    ➜  ~  boot2docker up
    Waiting for VM and Docker daemon to start...
    .o
    Started.
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/ca.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/cert.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/key.pem

    To connect the Docker client to the Docker daemon, please set:
    export DOCKER_HOST=tcp://192.168.59.104:2376
    export DOCKER_CERT_PATH=/Users/jacek/.boot2docker/certs/boot2docker-vm
    export DOCKER_TLS_VERIFY=1

    2,(仅适用Mac OS X和Windows用户)执行``$(boot2docker shellinit)``设置好终端,让``docker``知道微型Linux内核运行在哪儿(通过``boot2docker``)。为了设置上面的``export``,你必须在所有打开的运行Docker终端中重复这一步骤。如果你遇到``docker``命令的通信问题,记着这一步。
     ➜  ~  $(boot2docker shellinit)
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/ca.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/cert.pem
    Writing /Users/jacek/.boot2docker/certs/boot2docker-vm/key.pem

    3,执行``docker ps``确保为Docker配置好了终端。

    ➜ ~ docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES

    这时还没有容器运行。一旦首先启动Zookeeper的容器就会有变化,接着是Kafka。

    4,在Docker Hub上创建账号并运行``docker login``保存证书。你不必重复通过``docker pull``从Docker镜像的公用中心下载镜像,把Docker Hub看作存储Docker镜像的GitHub。参考文档使用Docker Hub获得最新信息。

    5,执行``docker pull wurstmeister/kafka``从Docker Hub下载Zookeeper镜像(可能需要几分钟)
     ➜  ~  docker pull wurstmeister/zookeeper
    Pulling repository wurstmeister/zookeeper
    a3075a3d32da: Download complete
    ...
    840840289a0d: Download complete
    e7381f1a45cf: Download complete
    5a6fc057f418: Download complete
    Status: Downloaded newer image for wurstmeister/zookeeper:latest

    你会看到各层的哈希打印在控制台里,符合预期。

    6,执行``docker pull wurstmeister/kafka``从Docker Hub下载Kafka镜像(可能需要几分钟)
     ➜  ~  docker pull wurstmeister/kafka
    latest: Pulling from wurstmeister/kafka
    428b411c28f0: Pull complete
    ...
    422705fe88c8: Pull complete
    02bb7ca441d8: Pull complete
    0f9a08061516: Pull complete
    24fc32f98556: Already exists
    Digest: sha256:06150c136dcfe6e4fbbf37731a2119ea17a953c75902e52775b5511b3572aa1f
    Status: Downloaded newer image for wurstmeister/kafka:latest

    7,在命令行中执行``docker images``验证``wurstmeister/kafka``和``wurstmeister/zookeeper``两个镜像已下载。
     ➜  ~  docker images
    REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
    wurstmeister/kafka latest 24fc32f98556 3 weeks ago 477.6 MB
    wurstmeister/zookeeper latest a3075a3d32da 9 months ago 451 MB

    8,现在可以在一个终端里运行``docker run --name zookeeper -p 2181 -t wurstmeister/zookeeper``引导启动Zookeeper。如果你在Mac OS X或Windows上,记得``$(boot2docker shellinit)``。
     ➜  ~  docker run --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    JMX enabled by default
    Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
    2015-07-17 19:10:40,419 [myid:] - INFO [main:QuorumPeerConfig@103] - Reading configuration from: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg
    ...
    2015-07-17 19:10:40,452 [myid:] - INFO [main:ZooKeeperServer@773] - maxSessionTimeout set to -1
    2015-07-17 19:10:40,464 [myid:] - INFO [main:NIOServerCnxnFactory@94] - binding to port 0.0.0.0/0.0.0.0:2181

    现在ZooKeeper在监听2181端口。用Docker(或者Mac OS上的Boot2Docker)的IP地址远程连接确认下。
     ➜  ~  telnet `boot2docker ip` 2181
    Trying 192.168.59.103...
    Connected to 192.168.59.103.
    Escape character is '^]'.

    9,在另一个终端里执行
    docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092 --link zookeeper:zk -t wurstmeister/kafka

    记得``$(boot2docker shellinit)``,如果你在Mac OS X或Windows上。
     ➜  ~  docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092 --link zookeeper:zk -t wurstmeister/kafka
    [2015-07-17 19:32:35,865] INFO Verifying properties (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,891] INFO Property advertised.port is overridden to 9092 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,891] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)
    ...
    [2015-07-17 19:32:35,894] INFO Property zookeeper.connect is overridden to 172.17.0.5:2181 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,895] INFO Property zookeeper.connection.timeout.ms is overridden to 6000 (kafka.utils.VerifiableProperties)
    [2015-07-17 19:32:35,924] INFO [Kafka Server 1], starting (kafka.server.KafkaServer)
    [2015-07-17 19:32:35,925] INFO [Kafka Server 1], Connecting to zookeeper on 172.17.0.5:2181 (kafka.server.KafkaServer)
    [2015-07-17 19:32:35,934] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
    [2015-07-17 19:32:35,939] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
    ...
    [2015-07-17 19:32:36,093] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
    [2015-07-17 19:32:36,095] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
    [2015-07-17 19:32:36,146] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
    [2015-07-17 19:32:36,172] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    [2015-07-17 19:32:36,253] INFO Registered broker 1 at path /brokers/ids/1 with address 61c359a3136b:9092. (kafka.utils.ZkUtils$)
    [2015-07-17 19:32:36,270] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
    [2015-07-17 19:32:36,318] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

    现在你的电脑上运行着依托Docker的Apache Kafka,你是它的的开心用户。用``docker ps``查看容器状态。
     ➜  ~  docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
    0b34a9927004 wurstmeister/kafka "/bin/sh -c start-ka 2 minutes ago Up 2 minutes 0.0.0.0:32769->9092/tcp kafka
    14fd32558b1c wurstmeister/zookeeper "/bin/sh -c '/usr/sb 4 minutes ago Up 4 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32768->2181/tcp zookeeper

    10,要结束你的Apache Kafka旅程时,用``docker stop kafka zookeeper``(或``docker stop $(docker ps -aq)``,如果运行的容器只有``kafka``和``zookeeper``)``docker stop``容器。
     ➜  ~  docker stop kafka zookeeper
    kafka
    zookeeper

    之后运行``docker ps``会显示没有正在运行的容器:
     ➜  ~  docker ps
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES

    现在没有正在运行的容器是因为他们被关闭了,这些容器依然可以被再次启动----使用``docker ps -a``查看可以使用的容器。
     ➜  ~  docker ps -a
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
    7dde25ff7ec2 wurstmeister/kafka "/bin/sh -c start-ka 15 hours ago Exited (137) 16 seconds ago kafka
    b7b4b675b9c0 wurstmeister/zookeeper "/bin/sh -c '/usr/sb 16 hours ago Exited (137) 5 seconds ago zookeeper

    11,最后,用``boot2docker down``停止``boot2docker``守护进程(仅对于Mac OS X和Windows用户)。
    #结语
    利用wurstmeister/kafkawurstmeister/zookeeper这两个镜像,不用过多修改本地的工作站环境去安装必需的包诸如Apache ZooKeeper,你就能运行Apache Kafka。除了Docker本身(和Boot2Docker,如果你恰好在Mac OS上),你不必担心其它升级软件和依赖,从而将你从花时间安装和维护你的机器和软件中解脱出来。还有,Docker镜像可以被部署到其它机器,确保一致的内部软件环境。

    你可以在博客下方的评论区留言,或者邮件联系我,告诉我你对这个话题的看法。记得在Twitter上关注作者@jaceklaskowski

    原文链接:Apache Kafka on Docker(翻译:Azriel 审校:魏小红)