概要
本文中我们将讨论如何借助 Kafka 实现分布式消息管理,使用事件溯源(Event Sourcing)模式实现原子化数据处理,使用CQRS模式(Command-Query Responsibility Segregation )实现查询职责分离,使用消费者群组解决单点故障问题,理解分布式协调框架Zookeeper的运行机制。整个应用的代码实现使用Go语言描述。
- 第一部分 引子、环境准备、整体设计及实现
- 第二部分 消息消费者及其集群化
- 第三部分 测试驱动开发、Docker部署和持续集成
第一部分 引子、环境准备、整体设计及实现
为什么需要微服务
微服务本身并不算什么新概念,它要解决的问题在软件工程历史中早已经有人提出:解耦、扩展性、灵活性,解决“烂架构”膨胀后带来的复杂度问题。
Conway’s law(康威定律)
Any organization that designs a system (defined broadly) will produce a design whose structure is a copy of the organization’s communication structure.(任何组织在设计一套系统(广义概念上的系统)时,所交付的设计方案在结构上都与该组织的通信结构保持一致)
— Melvyn Conway, 1967
《人月神话》:Adding manpower to a late software project makes it later —Fred Brooks, (1975)
为了赶进度加程序员就像用水去灭油锅里的火一样,原因在于:沟通成本 = n(n-1)/2,沟通成本随着项目或者组织的人员增加呈指数级增长。很多项目在经过一段时间的发展之后,都会有不少恐龙级代码,无人敢挑战。比如一个类的规模就多达数千行,核心方法近千行,大量重复代码,每次调整都以失败告终。庞大的系统规模导致团队新成员接手困难,项目组人员增加导致的代码冲突问题,系统复杂度的增加导致的不确定上线风险、引入新技术困难等。
微服务 (Microservices)是解决这些困难的众多方案之一。它本质上是一种软件架构风格,它是以专注于单一责任与功能的小型功能区块 (Small Building Blocks) 为基础,利用模组化的方式组合出复杂的大型应用程序,各功能区块使用与语言无关 (Language-Independent/Language agnostic) 的 API 集相互通讯。
Event Sourcing(事件溯源)
真正构建一个微服务是非常具有挑战性的。其中一个最重要的挑战就是原子化————如何处理分布式数据,如何设计服务的粒度。例如,常见的客户、工单场景,如果拆分成两个服务,查询都变成了一个难题:1
2
3
4
5select * from order o, customer c
where o.customer_id = c.id
and o.gross_amount > 50000
and o.status = 'PAID'
and c.country = 'INDONESIA';
在DDD领域(Domain-Driven Design,领域驱动设计)有一种架构风格被广泛应用,即CQRS (Command Query Responsibility Seperation,命令查询职责分离)。CQRS最核心的概念是Command、Event,“将数据(Data)看做是事实(Fact)。每个事实都是过去的痕迹,虽然这种过去可以遗忘,但却无法改变。” 这一思想直接发展了Event Source,即将这些事件的发生过程记录下来,使得我们可以追溯业务流程。CQRS对设计者的影响,是将领域逻辑,尤其是业务流程,皆看做是一种领域对象状态迁移的过程。这一点与REST将HTTP应用协议看做是应用状态迁移的引擎,有着异曲同工之妙。
实现方案
Kafka in a Nutshell
Apache Kafka是由Apache软件基金会开发的一个开源消息中间件项目,由Scala写成。Kafka最初是由LinkedIn开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka使用Zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。同时借助Zookeeper,kafka能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。
- Kafka Core Words
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。Topic相当于数据库中的Table,行数据以log的形式存储,非常类似Git中commit log。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。
Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition.
Producer:消息生产者,负责发布消息到Kafka broker
Consumer:消息消费者,向Kafka broker读取消息的客户端。
Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定则属于默认的group)。
整体设计
案例:假设一个银行账户系统。经过一段时间的经营发展,该行客户数量和交易规模都有了巨大的增长,系统内部变得异常复杂,每一个部分都变得沉重不堪。我们尝试对他的业务单元进行解耦,例如将余额计算逻辑从原有的核心系统拆分出来。根据银行账户业务特点,我们设计一个生产者——负责根据业务事件触发生成一个事件,所有事件基于Kafka存储,再设计一个消费者——负责从Kafka抓去未处理事件,通过调用业务逻辑处理单元完成后续持久化操作。这样一个账户的所有业务操作都可以有完整的快照历史,符合金融业务Audit(审计)的需要。而且通过使用事件,我们可以很方便地重建数据。
业务事件列表:
- CreateEvent:开户
- DepositEvent:存款
- WithdrawEvent:取款
- TransferEvent:转账
领域模型:账户(Account)
holder’s name:持有人名称
balance:余额
registration date:开户日期
……
领域模型:事件(Event)
name:事件名称
ID:序号
……
环境准备
- 第一步,启动ZooKeeper:
1 | $ wget http://mirror.bit.edu.cn/apache/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz |
第二步,启动Kafka
1
2
3
4
5
6
7
8
9
10$ bin/kafka-server-start.sh config/server.properties
[2017-06-13 14:03:08,168] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2017-06-13 14:03:08,172] INFO Kafka version : 0.10.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-06-13 14:03:08,172] INFO Kafka commitId : 3402a74efb23d1d4 (org.apache.kafka.common.utils.AppInfoParser)
[2017-06-13 14:03:08,173] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
$ lsof -nP -iTCP -sTCP:LISTEN | sort -n
$ netstat -an | grep 9092
tcp46 0 0 *.9092 *.* LISTEN第三步,创建topic
1 | $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic x-microservice-transactions-t1 |
- 另外,运行多个Kafka 实例
Kafka多实例非常简单,只需要复制文件 server.properties,稍作修改即可。1
2
3
4
5
6
7
8
9
10
11
12
13config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
// 启动多个broker,须指定不同的属性文件
$ bin/kafka-server-start.sh config/server-1.properties
$ bin/kafka-server-start.sh config/server-2.properties
domain model
1 | package main |
Kafka & Redis library
1 | // main.go |
1 | package main |
消息生产者Producer
1 | package main |
1 | // kafka.go |
1 | package main |
1 | $ go build |
第二部分 消息消费者Consumer及其集群化
Consumer负责从Kafka加载消息队列。另外,我们需要为每一个事件创建process()函数。
1 | package main |
1 | package main |
Go语言通过goroutine提供了对于并发编程的直接支持,goroutine是Go语言运行库的功能,作为一个函数入口,在堆上为其分配的一个堆栈。所以它非常廉价,我们可以很轻松的创建上万个goroutine,但它们并不是被操作系统所调度执行。除了被系统调用阻塞的线程外,Go运行库最多会启动$GOMAXPROCS个线程来运行goroutine。
- goroutines: A goroutine is a lightweight thread of execution.
- channels: Channels are the pipes that connect concurrent goroutines. (<- operator)
- for: for is Go’s only looping construct. Here are three basic types of for loops.
- select: Go’s select lets you wait on multiple channel operations.
- Non-Blocking Channel Operations
1 | func consumeEvents(consumer sarama.PartitionConsumer) { |
重构main
1 | package main |
通过—act参数,可以启动一个消费者进程。当进程运行时,他将从Kafka一个一个拿出消息进行处理,按照我们之前在每个事件定义的Process() 方法。
1 | $ go build |
集群化消息消费者
问题:如果一个Consumer宕机了怎么办?(例如:程序崩溃、网络异常等原因)
解决方案:将多个Consumer编组为集群实现高可用。具体来说就是打标签,当有一个新的Log发送时,Kafka将其发送给其中一个实例。当该实例无法接收Log时,Kafka将Log传递给另一个包含相同标签的Consumer。
注意:Kafka 版本 0.9 +,另外还需要使用sarama-cluster库
1 | #使用govendor获取 |
1 | //修改mainConsumer方法使用sarama-cluster library连接Kafka |
即使程序崩溃,MarkOffset也会将消息标记为 processed ,标签包括元数据以及这个时间点的状态。元数据可以被另外一个Consumer恢复数据状态,也就能被重新消费。即即使同样的消息被处理两次,结果也是一样的,这个过程理论上是 幂等 的(idempotent)。
1 | //运行多个consumer实例 |
第三部分:测试驱动开发、Docker部署和持续集成
使用vendor管理Golang项目依赖
用govendor fetch
1 | $ // |
单元测试:ginkgo Test Suite
1 | $ go get github.com/onsi/ginkgo/ginkgo |
1 | package main_test |
1 | $ ginkgo |
单元测试的四个阶段
- Setup 启动
- Execution 执行
- Verification 验证
- Teardown 拆卸
Docker部署
Docker 容器中需要包含下列组件:
- Golang
- Redis、Kafka
- 微服务依赖的其它组件
在根目录创建一个Dockerfile1
2FROM golang:1.8.0
MAINTAINER Yanrui
1 | //install our dependencies |
由于容器本地并没有一个Redis实例运行在上面,这时运行ginkgo测试就会报错。我们为什么不在这个Dockerfile中包含一个Redis呢?这就违背了Docker分层解耦的初衷,我们可以通过docker-compose将两个服务连接起来一起工作。
创建一个docker-compose.yml文件(与Dockerfile目录一致):1
2
3
4
5
6
7
8
9
10
11
12version: "2.0"
services:
app:
environment:
REDIS_URL: redis:6379
build: .
working_dir: /go/src/go-microservice
links:
- redis
redis:
image: redis:alpine
本地构建完成之后,再次运行 docker-compose run app ginkgo 测试通过。
Infrastructure as Code(基础设施即代码)
The enabling idea of infrastructure as code is that the systems and devices which are used to run software can be treated as if they, themselves, are software. — Kief Morris
云带来的好的一方面是它让公司中的任何人都可以轻松部署、配置和管理他们需要的基础设施。虽然很多基础设施团队采用了云和自动化技术,却没有采用相应的自动化测试和发布流程。它们把这些当作一门过于复杂的脚本语言来使用。他们会为每一次具体的改动编写手册、配置文件和执行脚本,再针对一部分指定的服务器手工运行它们,也就是说每一次改动都还需要花费专业知识、时间和精力。这种工作方式意味着基础设施团队没有把他们自己从日常的重复性劳动中解放出来。目前已经有很多商业云平台提供了Docker服务,只需要将自己的 git repository 链接到平台,即可以自动帮你完成部署,在云上完成集成测试。
1 | docker-compose build |
扩展阅读:开源架构技术漫谈
- Stack Overflow:2017年最赚钱的编程语言
- DevOps 漫谈:基于OpenCensus构建分布式跟踪系统
- 基于Go语言快速构建一个RESTful API服务
- 基于Kafka构建事件溯源型微服务
- 远程通信协议:从 CORBA 到 gRPC
- 应用程序开发中的日志管理(Go语言描述)
- 数据可视化(七)Graphite 体系结构详解
- 动态追踪技术(一):DTrace 导论
- 动态追踪技术(二):strace+gdb 溯源 Nginx 内存溢出异常
- 动态追踪技术(三):Tracing Your Kernel Function!
- 动态追踪技术(四):基于 Linux bcc/BPF 实现 Go 程序动态追踪
- 动态追踪技术(五):Welcome DTrace for Linux
- DevOps 资讯 | LinkedIn 开源 Kafka Monitor
参考文献
Theory
- 康威定律
- Mike Amundsen 《远距离条件下的康威定律——分布式世界中实现团队构建》
- Kief Morris《Infrastructure as Code - Managing Servers in the Cloud》
- InfoQ:《Infrastructure as Code》书评与摘要
Microservices
- (推荐)Martin Fowler : Microservices
- 李颖杰:为什么要重构到微服务(案例)
- Using GraphQL with Microservices in Go
Event Sourcing
- (推荐) Writing and Testing an Event Sourcing Microservice with Kafka and Go
- Linkedin Profile:Adam Pahlevi Baihaqi
- (推荐) OKONKWO VINCENT IKEM:Building Scalable Applications Using Event Sourcing and CQRS
- (推荐)Microsoft Azure:Event Sourcing
- 张逸:对CQRS的基础理解
- 汤雪华:领域驱动设计之领域模型
- 汤雪华:什么是事件溯源(Event Sourcing)
- InfoQ:A Whole System Based on Event Sourcing is an Anti-Pattern
Zookeeper & Kafka
- Kafka QuickStart
- Apache.org:Kafka 0.9 Consumer Rewrite Design
- Quora:What is the actual role of ZooKeeper in Kafka?
- grokbase:Relationship between Zookeeper and Kafka
- 朱赟:白话 IT 之要不要从 rabbitMQ 转 kafka?