睿云智合 | 分布式组件etcd应用


作者:周益

一、搭建etcd环境

我将使用 etcd v3版本, so,本地先建个单机版本ETCD环境。
docker pull xieyanze/etcd3:latest

docker run —name etcd-v3.0.9 -d -v /tmp:/data \
      -p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest

docker exec -it etcd-v3.0.9 sh
当前默认还是v2版本通过设定环境变量export ETCDCTL_API=3,设置成V3版本。
export ETCDCTL_API=3

etcdctl put /test/ok 11
etcdctl put /test/ok 22

etcdctl del  /test/gg

删除所有/test前缀的节点

etcdctl del  /test --prefix

etcdctl get /test/ok

前缀查询

etcdctl get /test/ok --prefix


二、软件逻辑结构

周益1微信图片_20171205160245.png

  • k8s master cluster dev-7 dev-8
  • k8s slave cluster 1 env1 dev-1 dev-2dev-3
  • k8s slave cluster 2 env2 dev-4 dev-5 dev-6


三、controller 与 agent 服务注册与发现

实现原理:

注意: etcd v3版本,k/v 的超时间时TTL最小5秒种。
  • 每2秒钟,每个服务向etcd发送一次心跳包,证明自己还活着; 当服务退出时,主动删除etcd的key或者等到TTL超时之后,自动下线;
  • controller需要获得agent的状态,直接GET [ingress/agent/${env_uuid}/]就能获得当前agent在线状态;
  • agent需要获得controller的状态,直接GET [ingress/controller]就能获得当前controller在线状态。


周益2微信图片_20171205161208.png


周益3微信截图_20171205161343.png


四、软件业务的实现

1. controller side:

客户端调用controller restful api.controller 直接写入ETCD,同时写入副本到mysql;
controller 如果关注于agent的变化,只需要watch ingress/agent这个目录;
controller 是无状态,不需要同步多个实例之间的数据,可以任意的scale它的实例数;
如果controller挂掉之后,重启加载mysql的数据库同步到etcd中。

2. controller需要了解规则执行状态

周益4微信图片_20171205161648.png


周益5微信截图_20171205161923.png


agent的执行状态直接写入配置状态中,
先获得当前ingress/agent/env1目录下的agent列表,对比ingress/ingress_agent/env1/${config_uuid1}/status目录下的规则完成之后反馈列表,每一个都存在时,则全部执行成功。

3. agent side:
  • 不同集群agent 通过etcd的watch功能在第一时间,获得监听到所有数据的变化 :新建、删除、更新;
  • 不同集群agent定时3分钟获得自已环境下的列表信息,同步处理相关信息;
  • 如果agent挂了之后,,重启加载一次etcd中所有的ingress_conifg。


五、代码实例

1. etcd clientv3 的封装
  • 连接管理支,持TLS;
  • 增、删、查,支持自动超时的设值;
  • watch 监听目录或KEY的值的变化(PUT,DELETE)。

package main

import (
"fmt"
"time"
//  "github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
//  "sync"
)


type EtcdData struct {
Key              string
Value            string
}

type EtcdHelper struct {
RequestTimeout   time.Duration
Client           *clientv3.Client
}

func NewEtcdHelper() *EtcdHelper {

//tlsInfo := transport.TLSInfo{
//  CertFile:      "/tmp/test-certs/test-name-1.pem",
//  KeyFile:       "/tmp/test-certs/test-name-1-key.pem",
//  TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
//}
//tlsInfo := transport.TLSInfo{
//  CertFile:      "./tls/apiserver.crt",
//  KeyFile:       "./tls/apiserver.key",
//}
//tlsConfig, err := tlsInfo.ClientConfig()
//if err != nil {
//  fmt.Printf("%s", err.Error())
//  return nil
//}

//cli, err := clientv3.New(clientv3.Config{
//  Endpoints: []string{"dev-7:2379"},
//  DialTimeout: 3 * time.Second,
//  TLS:         tlsConfig,
//})

cli, err := clientv3.New(clientv3.Config{
    Endpoints: []string{"127.0.0.1:2379"},
    //Endpoints: []string{"http://dev-7:2379"},
    DialTimeout: 3 * time.Second,
})

if err != nil {
    fmt.Printf("%s", err.Error())
    return nil
}

return &EtcdHelper{
    RequestTimeout: 5 *time.Second,
    Client: cli,
}
}

func (c *EtcdHelper) Release() {
if c.Client != nil {
    c.Client.Close()
}
}

func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()

// minimum lease TTL is 5-second
resp, err := c.Client.Grant(context.TODO(), ttl)
if err != nil {
    fmt.Printf("%s\n", err.Error())
    return err
}

_, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID))
if err != nil {
    fmt.Printf("%s\n", err.Error())
    return err
}

return nil
}

func (c *EtcdHelper) SetValue(key string, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()

_, err := c.Client.Put(ctx, key, value)
if err != nil {
    fmt.Printf("%s\n", err.Error())
    return err
}

return nil
}

func (c *EtcdHelper) GetValue(key string) []EtcdData {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
    fmt.Printf("%s\n", err.Error())
    return nil
}

var kv_slice []EtcdData
for _, ev := range resp.Kvs {
    //fmt.Printf("%s : %s\n", ev.Key, ev.Value)

    kv := EtcdData{string(ev.Key), string(ev.Value)}
    kv_slice = append(kv_slice, kv)
}

return kv_slice
}

func (c *EtcdHelper) DelValue(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
_, err := c.Client.Delete(ctx, key, clientv3.WithPrefix())
if err != nil {
    fmt.Printf("%s\n", err.Error())
    return err
}

 return nil
}

func (c *EtcdHelper) Watch(key string) {
rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
for wresp := range rch {
    for _, ev := range wresp.Events {
        fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
    }
}
}

func (c *EtcdHelper) Listen(key string) clientv3.WatchChan {
return c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
}


2. controller 的代码实现
  • controller上线,下线功能;
  • controller定时发送心跳包到etcd;
  • controller监听agent的变化.(1-3)完成服务注册与发现;
  • controller通过下发配置到etcd,通知所有watch ingress_config变化的agent。

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ControllerClient struct {
Period   time.Duration
Name     string
IP       string
Helper   *EtcdHelper

StopCha  chan int

//Lock     *sync.Mutex
}

func NewControllerClient(name string, host_ip string) *ControllerClient {
return &ControllerClient{
    Period:  2,
    Name:    name,
    IP:      host_ip,
    Helper:  NewEtcdHelper(),
    StopCha: make(chan int, 10),
    //Lock:    new(sync.Mutex),
}
}

func (cc *ControllerClient) Init(display bool) {
go func() {
    cc.OnLine()

    for {
        select {
        case <-cc.StopCha:
            fmt.Printf("online goroutinue is exited.")
            return
        case <-time.After(time.Second * cc.Period):
            cc.OnLine()
        }
    }
}()


if display {
    go func() {
        watch_chan := cc.Helper.Listen("/ingress/agent")
        for wresp := range watch_chan {
            for _, ev := range wresp.Events {
                fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
            }
        }
    }()
}
}

func (cc *ControllerClient) OnLine() {
key := fmt.Sprintf("/ingress/controller/%s", cc.Name)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.PutValue(key, "1", 5)

if err != nil  {
    fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) OffLine() {
close(cc.StopCha)

key := fmt.Sprintf("/ingress/controller/%s", cc.Name)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)

if err != nil  {
    fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData {
//TODO. first save to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
return cc.Helper.GetValue(key)


}

func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) {
//TODO. first save to mysql.

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.SetValue(key, config)

if err != nil  {
    fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) {
//TODO. first update to mysql.

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)

if err != nil {
    fmt.Printf(err.Error())
}
}


3. agent代码实现
  • agent代码实现
  • agent上线,下线功能; agent定时发送心跳包到etcd;
  • agent监听(watch)controller的变化,(1-3)完成服务注册与发现;
  • agnet监听(watch) ingress_config变化的agent,实时完成更新或设置配置,删除配置功能。

type AgentClient struct {
LivePeriod         time.Duration
FirstConfigPerid   time.Duration
SyncConfigPeriod   time.Duration


Name         string
EnvUUID      string
IP           string
Helper       *EtcdHelper

StopCha      chan struct{}
}

func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient {
return &AgentClient{
    LivePeriod:         2,
    FirstConfigPerid:   3,
    SyncConfigPeriod:   60,


    Name:           name,
    EnvUUID:        env_uuid,
    IP:             host_ip,
    Helper:         NewEtcdHelper(),
    StopCha:        make(chan struct{}, 1),
}
}

func (ac *AgentClient) Init(display bool) {
//我还活着,不要干掉我.
go func() {
    ac.OnLine()

    for {
        select {
        case <-ac.StopCha:
            return
        case <-time.After(time.Second *ac.LivePeriod):
            ac.OnLine()
        }
    }
}()

//if display {
//  go func() {
//      watch_chan := cc.Helper.Listen("/ingress/agent")
//      for wresp := range watch_chan {
//          for _, ev := range wresp.Events {
//              fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
//          }
//      }
//  }()
//}

//重启之后,第一次同步 和 定期同步.
//go func() {
//
//  time.Sleep(time.Second * ac.FirstConfigPerid)
//  ac.SyncIngressConfigs()
//
//  for {
//      select {
//      case <-ac.StopCha:
//          return
//      case <-time.After(time.Second * ac.SyncConfigPeriod):
//          ac.SyncIngressConfigs()
//      }
//  }
//}()

if display {
    //监听controller变化(等待处理掉线自动重连后,重监听)
    go func() {
        watch_chan := ac.Helper.Listen("/ingress/controller")
        for wresp := range watch_chan {
            for _, ev := range wresp.Events {
                fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
            }
        }
    }()
}

//监听本环境下ingress_config的变化(等待处理掉线自动重连, 重监听)
go func() {
    key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
    watch_chan := ac.Helper.Listen(key)
    for wresp := range watch_chan {
        for _, ev := range wresp.Events {
            fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

            switch ev.Type.String() {
            case "PUT":
                fmt.Printf("agent=%s SetIngressConfig(%s, %s)\n", ac.Name, ev.Kv.Key, ev.Kv.Value)
                //TODO: SetIngressConfig(key, value)
                break
            case "DELETE":
                fmt.Printf("agent=%s DelIngressConfig(%s)\n", ac.Name,  ev.Kv.Key)
                //TODO: DelIngressConfig(key)
                break
            }
        }
    }
}()
}

func (ac *AgentClient) OnLine() {
key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5)
if err != nil  {
    fmt.Printf(err.Error())
}
}

func (ac *AgentClient) OffLine() {
//ac.StopCha <- 1
close(ac.StopCha)

key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil  {
    fmt.Printf(err.Error())
}
}

func (ac *AgentClient) UpdateIngressStatus(uuid string) {
key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil  {
    fmt.Printf(err.Error())
}
}

//服务重启之后,第一次先调用 并用 定时同步
func (ac *AgentClient) SyncIngressConfigs() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
kv_slice := ac.Helper.GetValue(key)
if kv_slice != nil {
    //TODO: ingressConfig.SyncIngressConfigs(kv_slice)
    for _, kv := range kv_slice {
        fmt.Printf("name=%s, key:%s-----value:%s\n", ac.Name, kv.Key, kv.Value)
    }
}
}


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func main() {

controller1 := NewControllerClient("dev-7_001", "192.168.0.10")
controller1.Init(false)
controller2 := NewControllerClient("dev-8_002", "192.168.0.11")
controller2.Init(false)
controller3 := NewControllerClient("dev-8_003", "192.168.0.12")
controller3.Init(false)


agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1")
agent1.Init(false)

agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2")
agent2.Init(false)

agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3")
agent3.Init(false)

agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4")
agent4.Init(false)

agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5")
agent5.Init(false)

agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6")
agent6.Init(false)

agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7")
agent7.Init(false)

agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8")
agent8.Init(false)

agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9")
agent9.Init(false)

agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10")
agent10.Init(false)


time.Sleep(time.Second*1)
controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0001")

controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0002")

controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0003")

controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0004")

controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0005")




forever := make(chan struct{})
<-forever
}


总结

本文没有完成自动重连功能,代码仅作演示,后续会完善这部分功能。etcd 作为k8s的核心主件, 可以用这样的方式加入生产代码中。在agent侧, agent实例很多的情况下,watch这样的效率会很高。

文末福利4微信截图_20171116153321.png

0 个评论

要回复文章请先登录注册