DockOne微信分享(一五六):由浅入深SCF无服务器云函数实践


【编者的话】近年来,互联网服务从一开始的物理服务器托管,虚拟机,容器,发展到现在的云函数,逐步无服务器化。程序员逐步聚焦于最核心的业务逻辑开发,解放了生产力,显著提升服务上线效率。云函数带来了真正的计算服务。

近年来,互联网服务从一开始的物理服务器托管,虚拟机,容器,发展到现在的云函数,逐步无服务器化,如下表所示。程序员逐步聚焦于最核心的业务逻辑开发,解放了生产力,显著提升了服务上线效率。
1515985322(1).png

云函数带来了真正的计算服务,如下表所示,类比腾讯云COS对象存储,SCF以函数为单位封装计算,按需调度执行,无须关心函数的自动扩缩容,故障容灾等,无任何闲置成本。
1515987772(1).png


云函数给用户带来的价值主要3点:
  1. 简化架构:函数粒度的微服务架构,使得系统的各个功能天然解耦,能像搭积木一样组合自有及外部服务,实现所看即所得的后台服务;
  2. 简化开发:无需关注底层硬件配置、OS,服务启停、网络收发,故障容灾,服务扩缩容等,只需写最核心的业务逻辑,实现真正的代码即服务;
  3. 简化运维:无须关注服务部署,服务器运维,安全管控,扩缩容配置等,且应用能无缝升级,实现无痛切换到DevOps模式。
  4. 减少支出:无闲置成本,仅对函数资源大小,执行时间,执行次数按需计费,相对云主机平均5%~15% 的使用率,价格优势明显,实现了最彻底的按需计费。


我们团队正在做弹性计算相关的事情,业务需求多,平台自身也需持续优化来支撑不断扩大的运营规模,现在5人左右的小团队要支撑100w核级别的计算运营,云函数的出现,正好解了我们在人力上的燃眉之急,在这里分享一下,希望能对大家有所启发。

使用云函数实现主动拨测工具

我们有一些低频调用的http服务,比如buffer池空闲机器借还,上架等,这类服务用户调用出错时处理代价较大,要确保用户调用时服务正常,需要有主动拨测的机制,先于用户发现并修复问题,在云函数出现之前,需要开发拨测工具,实现定时调用,并实现工具本身的故障容灾能力,且要申请2台以上的虚拟机或容器发布部署,既耗费人力,也耗费资源。应用云函数后,我们只需简单的3步便可实现:
  1. 在SCF云函数平台创建一个函数,如下图所示。
    请输入图片名称
  2. 配置该函数为定时触发,比如5分钟触发一次,如下图所示,配置完成后服务即刻启用。
    2.png
  3. 可以在日志页面查看函数运行状态,当检测到异常时,会调用告警工具发送告警微信。
    3.png


在主动拨测工具这个场景,我们从云函数获得的收益主要是快速成形,且无需运营维护,达到了既定目标同时,没有额外增加运营成本。SCF无服务器云函数为每个用户设置了免费额度,该应用场景几乎肯定能包含在免费额度之内。

使用云函数规整运营统计脚本

我们之前用Python开发了大量统计脚本,用来展现平台的运营概况,可用性,质量,趋势等,由crontab驱动每日定期执行,随着时间累积及人员的更替,这些脚本部署管理逐步混乱,比如想要修复某个数据时,可能不知道脚本部署在哪,或者某天服务器故障,恢复统计脚本的正常运营比较麻烦,针对这些问题,我们利用云函数简单包装便可解决,比如下面是一个统计运营中母机数的函数,直接import原脚本,在入口函数内调用即可。
# -*- coding: utf-8 -*-
from ctypes import *
import os
import base64
import json
import calculate_biz_host_num

def main_handler(event, context):
return calculate_biz_host_num.main()

if __name__ == '__main__':
'''just for test'''
event = {}
lambda_handler(event, 4)

在规整运营统计脚本这个场景,我们从云函数获得的主要收益是快速帮助我们把散落到各台服务器的脚本规整起来统一维护,且再也不用担心统计脚本运营与服务器故障问题。

使用云函数快速嵌入图片类型识别功能

我们有一个图片压缩服务,上传时压缩图片以降低存储容量及下载带宽消耗,压缩的效果要达到图片质量与压缩比的均衡,在某些场景,比如微信朋友圈,存在一些广告图片,用户一般不会关注其细节,故可以提高压缩比,牺牲质量以进一步的降低运营成本,而图片类型的识别计算复杂度高,无法在逻辑svr本地完成,传统的办法是实现一个图片类型识别服务,但实现该服务需要开发工作量较大,比如需要写接入,逻辑server,实现容灾分布,负载均衡等,且由于图片上传有明显的波峰波谷效应,还需要实现自动扩缩容,不仅如此,部署也较为复杂,难以满足快速试错的需求。
请输入图片名称
应用云函数后,我们只需创建并实现一个类型识别函数,如下所示,在函数里调用算法工程师实现的C++图片识别程序即可,无须关心容灾分布,负载均衡,自动扩缩容及服务的部署与运维等。
# -*- coding: utf-8 -*-
from ctypes import *
import os
import pictype
import base64
import json

def main_handler(event, context):
str = pictype.cppmain(event["pic_data"])
jso = json.loads(str)
print jso["QRCode"]
return str

if __name__ == '__main__':
'''just for test'''
event = {}
imageFile = open("2qrcode.jpg","rb")
event["pic_data"] = base64.b64encode(imageFile.read())
imageFile.close()
lambda_handler(event, 4)

在嵌入图片类型识别功能这个场景,我们从云函数获得的主要收益是使用极小的成本便快速扩展了现有平台的能力,短时间内便试错验证了依据图片类型选择不同压缩比在运营成本上的收益。

使用云函数实现游戏AI数据预处理

尝到甜头后,我们越来越有信心使用云函数来实现更复杂的需求,正好当前在支持游戏AI团队做一些计算,典型的AI计算过程如下图所示,模型训练前的数据预处理耗费了大量的时间与计算资源。
5.png
以王者荣耀的AI为例,如下图所示,数据预处理一般分为两步:
  1. Mapper计算:从cos读取游戏录像文件,提取英雄等级,血量,攻击,法强,技能冷却等特征,使用HDF5文件保存;
  2. Reducer计算:读取选定范围的HDF5文件,shuffle处理随机化后,规整成每个文件5120帧,再输出供模型训练使用;


6.png
我们应用云函数实现该预处理,只需实现mapper/reducer计算函数,并配置合适的计算触发规则即可,比如实现Mapper函数如下所示(省略若工具型函数代码),并配置为cos上传触发,这样当有录像文件上传时,可自动调用mapper函数转化为HDF5文件。
# -*- coding utf-8 -*-
import os
import sys
import datetime
import traceback
import shutil
import commands
import cos_sdk


def main_handler(event, context):
res = map_caller(event, context)
if res == 0:
    return "succ"
else:
    return "fail"

def map_caller(event, context):
# Note: this is test account, change to own cos appid and secret_id
appid = '123443xxxx'
secret_id = 'QmFzZTY0IGlzIGEgZ2Vxxxx'
secret_key = 'AKIDZfbOA78asKUYBcXFrJD0a1ICvxxxx'
host = 'sz.cxxxxxx'
addr = '10xxxx'

bucket = event['bucket']
cos_input_file = event['input']
cos_output_key = event['output']
cos_file_name = cos_input_file.split('/', 1)[1]
print("cos_file_name: ", cos_file_name)

# step 1. Download .abs file from cos
cos = cos_sdk.CosHandler(appid, bucket, secret_id, secret_key, host, addr )
container_base_path = '/tmp/AITest/mapper'
container_input_path = '/tmp/AITest/mapper/input/'
container_output_path = '/tmp/AITest/mapper/output/'
cos_output_path = 'mapdata/'

try:
    if not os.path.exists(container_base_path):
        os.makedirs(container_base_path)
    if not os.path.exists(container_input_path):
        os.mkdir(container_input_path)
    if not os.path.exists(container_output_path):
        os.mkdir(container_output_path)
except:
    traceback.print_exc()
    return -1

ret = cos.download_file('/', cos_input_file, container_input_path, cos_file_name)
if not ret:
    print("Download file from cos Failed [%s]" % cos_file_name)
    return -1
print("Download file [%s] succ" % cos_file_name)

# step 2. transfer .abs file to .hdf5
ret = transfer_data(container_base_path, container_input_path, cos_file_name, container_output_path)
if not ret:
    print("transfer data fail")
    return -1

# step 3. upload .hdf5 file to cos
output_filename = get_output_file(cos_file_name, container_output_path)
if output_filename == "":
    return -1
print(container_output_path+output_filename)
if not os.path.exists(container_output_path+output_filename):
    print "output file not exist"
print(cos_output_path+output_filename)
ret = cos.upload_file(container_output_path+output_filename, cos_output_path+output_filename)
if not ret:
    return -1

## clean up result files
shutil.rmtree(container_output_path)
return 0

def transfer_data(base_path, file_path, file_name, output_path):
try:
    CurPath = '/var/user'
    if os.path.exists(base_path + '/transfer_script'):
        shutil.rmtree(base_path + '/transfer_script')
    shutil.copytree(CurPath + '/transfer_script', base_path + '/transfer_script')
    StartPath = base_path + "/transfer_script/5v5_vecmodel_tactics"
    InputFilePath = file_path + file_name
    OutputPath = output_pat
    sgameBinFile = base_path + "/transfer_script/log_transform/bin/transform/sgame_log_transform"
    os.chmod(sgameBinFile, 755)
    labelBinFile = StartPath + "/label"
    featureBinFile = StartPath + "/VecFeatureExtract"
    os.chmod(labelBinFile, 755)
    os.chmod(featureBinFile, 755)
    os.chdir(StartPath)
    cmd = 'sh start.sh ' + InputFilePath + ' ' + OutputPath
    (status, output) = commands.getstatusoutput(cmd)
    print("status: ", status)
    print("output: ", output)
except:
    traceback.print_exc()
    return False

if status == 0 and output == "pipeline success":
    return True
else:
    return False

实现Reducer函数如下所示(省略若工具型函数代码),亦可配置cos写文件触发,当上传文件数达到一定数量且符合其他条件时,执行reducer函数的处理功能。
# -*- coding utf-8 -*-
import os
import sys
import traceback
import shutil
import commands
import re
import common
import cos_sdk

try:
import xml.etree.cElementTree as ET
except ImportError:
import xml.etree.ElementTree as ET


def main_handler(event, context):
res = reducer_caller(event, context)
if res == False:
    return "fail"
else:
    return "succ"

def reducer_caller(event, context):
# Note: this is test account, change to own cos appid and secret_id
appid = '12344321xxx'
secret_id = 'QmFzZTY0IGlzIGEgZ2xxxx'
secret_key = 'AKIDZfbOA78asKUYBcXFrJD0axxx'
host = 'sz.xxxx'
addr = '10.xxxx'

bucket = 'mapreduce'
container_base_path = '/tmp/AITest/reducer'
container_mapfile_path = '/tmp/AITest/reducer/mapdata/'
container_output_path = '/tmp/AITest/reducer/output/'
cos_mapdata_dir = '/'
cos_output_key = u'output/'

## AI shuffle config
p0_thread_num = '30'
p1_thread_num = '4'
ai_bucket = '4'
sample_num = '5120'

## init container directory
try:
    if not os.path.exists(container_base_path):
        os.makedirs(container_base_path)
    if not os.path.exists(container_mapfile_path):
        os.mkdir(container_mapfile_path)
    if not os.path.exists(container_output_path):
        os.mkdir(container_output_path)
except:
    traceback.print_exc()
    return False

cos = cos_sdk.CosHandler(appid, bucket, secret_id, secret_key, host, addr)

## step 1. get all mapper output data name (*.abs)
min_mapfiles = 40
mapfiles = get_mapfiles(cos, bucket)
if mapfiles == []:
    print("No exist data map file in cos, please run lambda mapper first")
    return False
elif len(mapfiles) < min_mapfiles:
    print("No enough map files in cos, at least %d map files can trigger shuffle process" % min_mapfiles)
    return False

## step 2. download mapper data from cos
for mapfile in mapfiles:
    download_ret = download_file(cos, cos_mapdata_dir, mapfile, container_mapfile_path)
    if download_ret != 0:
        return False   

# step 3. shuffle mapper input file
ret = shuffle_data(container_base_path, container_mapfile_path, container_output_path, p0_thread_num, p1_thread_num, ai_bucket, sample_num)
if not ret:
    print("shuffle data fail")
    return False

# step 4. upload .hdf5 file to cos
output_files = get_output_files(container_output_path)
if len(output_files) == 0:
    print("No output results in *.hdf5")
    return False

all_upload_ret = 0
print(output_files)
for output_file in output_files:
    upload_ret = upload_file(cos, container_output_path, output_file, cos_output_key)
    all_upload_ret += upload_ret
    if upload_ret != 0:
        print("Upload output file [%s] to cos failed" % output_file)
if all_upload_ret != 0:
    return False

## clean up result files
shutil.rmtree(container_output_path)
return "Finish shuffle data"


def get_mapfiles(cos_client, bucket):
status, ret_msg = cos_client.list_object(bucket)
if str(status)[0] != '2':
    print("Get map data file error")
    return -1

mapfiles = []
root = ET.fromstring(ret_msg)
for key in root.findall('Contents'):
    filename = key.find('Key').text
    if re.match(r'^mapdata/', filename):
        mapfiles.append(filename)
return mapfiles

def shuffle_data(container_base_path, container_mapfile_path, container_output_path, p0_thread_num, p1_thread_num, ai_bucket, sample_num):
try:
    CurPath = '/var/user'
    shuffle_tools = '/shuffle_all_tools/'
    if os.path.exists(container_base_path + shuffle_tools):
        shutil.rmtree(container_base_path + shuffle_tools)
    shutil.copytree(CurPath + shuffle_tools, container_base_path + shuffle_tools)

    os.chdir(container_base_path + shuffle_tools)
    cmd = 'sh king_shuffle_start.sh ' + container_mapfile_path + ' ' + p0_thread_num + ' ' + p1_thread_num + ' ' + container_output_path + ' ' + ai_bucket + ' ' + sample_num
    print cmd
    val = os.system(cmd)
    print val
    return True
except:
    traceback.print_exc()
    return False

在游戏AI训练数据预处理这个场景,我们从云函数获得的主要收益是快速实现数据预处理服务,避免AI工程师陷入到考虑计算分布化,容灾,扩容,服务器故障处理等平台性事项中,能够更专注于算法设计;另外AI计算资源耗费量巨大,云函数实现了资源真正按需分配,无需保留大批服务器造成资源浪费。

在应用SCF无服务器云函数实践过程中,深刻体会到了其减少设计开发,运营维护工作量及在运营成本方面的优势,如果开发新的功能,云函数会成为我们团队的首选,作为团队架构师,应该承当好的一个责任是与时俱进的引入新生产力工具,持续推进团队开发运营效率提升及持续的追求成本优化,由于云函数在加速服务上线时间方面革命性的优势及按需使用计费的特点,它可能会比docker容器更快被广泛接受,谁能更快的拥抱云函数,谁便能更快的建立研发与运营的优势,欢迎大家试用腾讯云-SCF无服务器云函数,一起更好的迎接并促进无服务器时代的到来。

Q&A

Q:云函数怎么和其它业务系统集成?

A:在腾讯云产品中,云函数已经和COS、CMQ、API gateway、日志等打通,可直接配置事件触发关系,另外用户在函数代码里,可自行实现与其它业务系统集成的代码,当前云函数直接可访问公网服务,马上可访问用户VPC里的服务。
Q:云函数实际应用中,和一般写法有什么不同,有什么缺点?

A:云函数当前支持Python2.7、3.6,Node.js 6.10,Java 8等运行环境,可在本地开发编写代码上传,也可在云端直接编写,对比一般程序的写法,无须实现网络监听,故障容灾,扩容,日志监控等相关代码,极大的减少大家的代码开发量,缺点是调试不如本地方便,比如不能直接用GDB等工具单步调试。
Q: 腾讯SCF对于有状态服务是怎么滚动更新的扩容的?

A: SCF一般用来承载无状态的微服务,如果是有状态的实现滚动更新,需要把状态数据保存到CMQ,COS等持久化存储里。
Q:腾讯SCF对于容器扩容怎么做到不影响业务下扩容?

A: 腾讯SCF的函数调用由中控invoker模块统一发起,invoker模块知道每次函数调用在容器中的执行延时,执行结果等,且能判断容器是否空闲等;扩容容器时,完成内部函数运行时环境初始化后,才标记容器为空闲状态,可接收调用请求。
Q: 对云函数不是太懂,云函数与函数之间调用是通过http协议吗 还是rpc或者其他方式?

A: 函数之间调用采用http协议,这是业内cloud function的通用做法,内部模块之间使用rpc通信。
Q: 如果一个函数一个容器 那一个项目函数至少几万个吧 这样岂不是要部署上万个容器?

A: 函数被真实调用时,才会去分配容器,同时存在的容器数取决于有多少个函数正在被调用,调用的并发次数是多少,这是云函数的最大价值之一:避免资源闲置。
Q: 代码文件是通过Dockerfile打包进容器的吗?这样构建会不会有点慢,像在线执行代码这类的,感觉都很及时。

A: 通过Dockerfile打包成镜像再下载,确实耗时很长,所以实际运行代码没打包到镜像里,而是直接下发到母机,再将目录挂到容器里面。
以上内容根据2018年12月26日晚微信群分享内容整理。 分享人陈杰,腾讯云架构平台部技术专家。10年云计算经验,现供职于腾讯架构平台部,负责弹性计算及云函数技术研发,致力于提供领先的基础设施平台以提升资源利用率及优化提升程序员开发运维效率。 DockOne每周都会组织定向的技术分享,欢迎感兴趣的同学加微信:liyingjiesa,进群参与,您有想听的话题或者想分享的话题都可以给我们留言。

0 个评论

要回复文章请先登录注册