技术漫谈 | 集群中容器间的通迅


作者:周益

相关技术

redis 服务发现
mysql 数据持久化
golang 原生rpc-json

我们在k8s中启动3个restful api service实例,当对外提供服务的时候,这个api service 可能出现实例的内存数据不一致的情况。

原本consul作为服务发现是最佳的。为了进一步减少组件,我们使用redis来完成服务发现的功能。

11微信图片_20171010153011.png


APService 3个实例如下
  • Instance1 192.168.0.101
  • Instance2 192.168.0.102
  • Instance3192.168.0.103


1.instance 启动之后,每秒发送一次心跳数据包。如果3秒之类没有发起,redis自动过期。
我们可以认为instance下线了。

12微信截图_20171010153131.png


2.获得在线的服务器列表

13微信截图_20171010153238.png


Response:
  • Instance1
  • Instance2
  • Instance3


3.获得真实的

14微信截图_20171010153352.png


Response:
  • 192.168.0.101
  • 192.168.0.102
  • 192.168.0.103


当外部客户端调用如果Instance 有写入数据的操作,有以下的4个步骤:
  1. 获得除了自己之外的在线服务器ip地址。
  2. 通过RPC调用直接修改其它服务器的内存数据 Instance1--rpc-->Instance2
    修改其内存数据 Instance1--rpc-->Instance3 修改其内存数据。
    3.RPC调用都成功后,再将数据写入mysql数据库中,否则对外报告失败了。
    4.仅由当前的Instance1下发配置信息到agent。


容器之间的通迅可以使用golang 原生的rpc-json 来实现peer to peer调用

15微信图片_20171010154043.jpg


每个instance 都作为服务器,又都作为客户端。
1.instance 启动时,监听着7771端口。
globalClusterServer := NewClusterServer()  
globalClusterServer.Init()


2.instance 在需要的同步数据的地方直接调用。
globalClusterClient = NewClusterClient()
globalClusterClient.Init()if err := globalClusterClient.StoreIngressRule(request, response); err!=nil {    log.Error(err.Error())
}if err := globalClusterClient.StoreIngressConfig(reqeust, response); err!=nil {    log.Error(err.Error())
}


cluster_server.go
package ingress

import ("fmt"
"errors"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"github.com/astaxie/beego/utils"
"wise2c/wisecloud-ingress-controller/log")

type ICAPIServer intfunc (t *ICAPIServer) StoreIngressRule(request *RequestStoreIngressRule, response *ResponseStoreIngressRule) error {if request == nil {
return errors.New("request is empty.")
}if response == nil {
return errors.New("response is empty.")
}

err := globalIngresProcess.MemStoreIngressRule(&request.IngressRule)if err != nil {
response.Set(0, fmt.Sprintf("MemStoreIngressRule is failed.%s", err.Error()))
return err
}

response.Set(1, "ok")
return nil
}//////////////////////////////////////////////////////////////////////////////////////type ClusterServer struct {
Portocol string
Port     string
Listener   *net.TCPListener
}func NewClusterServer() *ClusterServer {
return &ClusterServer{
Portocol  : "tcp",
Port      : common.ClusterPort,
Listener    : nil,
}
}func (this *ClusterServer) Init() {
icapi_server := new(ICAPIServer)
rpc.Register(icapi_server)

addr, _ := net.ResolveTCPAddr("tcp", this.Port)
var err error
this.Listener, err = net.ListenTCP("tcp", addr)if err != nil {
panic(err)
}
log.Info("The Controller's cluster server is listened[%s].", addr)

go this.Listen()
}func (this *ClusterServer) Listen() {for {
conn, e := this.Listener.Accept()if e != nil {continue
}
go jsonrpc.ServeConn(conn)
}
}


globalClusterServer := NewClusterServer()
globalClusterServer.Init()


cluster_client.go
"net/rpc/jsonrpc"
"wise2c/wisecloud-ingress-controller/log"
"wise2c/wisecloud-ingress-controller/common")type ICAPIClient struct {
Client     *rpc.Client
Retry      int}func NewICAPIClient(address string) (*ICAPIClient, error) {
api_client := &ICAPIClient{
Client :nil,
Retry  :3,
}var err error
api_client.Client, err = jsonrpc.Dial("tcp", address)if err != nil {return nil, errors.New(fmt.Sprintf("ICAPIClient connect %s is failed. %s", address, err.Error()))
}return api_client, nil}func (this *ICAPIClient) Close() error {if this.Client == nil {return nil
}

err := this.Client.Close()if err != nil {return errors.New(fmt.Sprintf("ICAPIClient close is failed. %s", err.Error()))
}

this.Client = nil
return nil}func (this *ICAPIClient) Handler(func_name string,request interface{}, response interface{}) error {if this.Client == nil {return errors.New(fmt.Sprintf("ICAPIClient's client is null"))
}for i:=0; i<this.Retry; i++ {var err error
err = this.Client.Call(func_name, request, response)if err != nil {
log.Error(fmt.Sprintf("%s is failed. %s", func_name, err.Error()))
time.Sleep(1*time.Second)continue
}break
}return nil}func (this *ICAPIClient) StoreIngressRule(request *RequestStoreIngressRule, response *ResponseStoreIngressRule) error {return this.Handler("ICAPIServer.StoreIngressRule", request, response)
}func (this *ICAPIClient) StoreIngressConfig(request *RequestStoreIngressConfig, response *ResponseStoreIngressConfig) error {return this.Handler("ICAPIServer.StoreIngressConfig", request, response)
}////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////type ClusterClient struct {
Portocol string
Port     string}func NewClusterClient() *ClusterClient {return &ClusterClient{}
}func (this *ClusterClient) Init() {
this.Portocol = "tcp"
this.Port     = common.ClusterPort
}type IngressRuleCallback func(request, response interface{}, client *ICAPIClient) errorfunc (client *ClusterClient) IngressRuleHandler(request, response interface{}, callback IngressRuleCallback )  (err error){
cluster_ips, err := globalRedisClient.GetClusterOtherIP(globalSelfPodName)if err != nil {return errors.New(fmt.Sprintf("GetOtherController() is failed. %s", err.Error()))
}

err_msg := ""
for _, ip := range cluster_ips {

address := fmt.Sprintf("%s%s", ip, client.Port)

rpc_client, err := NewICAPIClient(address)if err != nil {
err_msg += fmt.Sprintf("connect cluster %s is failed.%s", address, err.Error())continue
}defer rpc_client.Close()if callback != nil {
err = callback(request, response, rpc_client)if err != nil {
err_msg += fmt.Sprint("call cluster is failed.", address, err.Error())
}
}
}if err_msg != "" {return errors.New(err_msg)
}return}func (this *ClusterClient) StoreIngressRule(request *RequestStoreIngressRule, response *ResponseStoreIngressRule) error {return  this.IngressRuleHandler(request, response, func(request, response interface{}, client *ICAPIClient) error {return client.StoreIngressRule(request.(*RequestStoreIngressRule), response.(*ResponseStoreIngressRule))
})
}func (this *ClusterClient) StoreIngressConfig(request *RequestStoreIngressConfig, response *ResponseStoreIngressConfig) error {return  this.IngressRuleHandler(request, response, func(request, response interface{}, client *ICAPIClient) error {return client.StoreIngressConfig(request.(*RequestStoreIngressConfig), response.(*ResponseStoreIngressConfig))
})
}

globalClusterClient = NewClusterClient()
globalClusterClient.Init()


文末福利:请大家关注【Wise2C】并回复【进群】,睿云小助手会第一时间拉你进入【 Docker企业落地实践群】,我们分享的各个企业案例项目的技术专家与用户代表,正在敬候您的光临,期待大家就项目的更多细节与疑问与群里的大牛们进行咨询探讨。
需要了解更多有关睿云智合的客户项目细节,请在Wise2C公众号中最佳实践菜单中查看。
若需要了解更多有关Wise系列PaaS产品的详情,请与我们的市场团队联系:contact@wise2c.com

睿云智合微信截图_20170926111623.png

0 个评论

要回复文章请先登录注册