对接云商资源,以腾讯云cvm为例。并做到可以分页和限速
https://console.cloud.tencent.com/cam
在访问管理→用户→用户列表→新建用户。然后再赋予权限。得到SecretId和SecretKey可以用于api的访问。
查询不同region对应的代码。用于做实验的主机位于上海,所以代码是ap-shanghai
package region import "github.com/infraboard/cmdb/utils" var Regions = []utils.EnumDescribe{ {Name: "Bangkok", Value: "ap-bangkok", Describe: "曼谷"}, {Name: "Beijing", Value: "ap-beijing", Describe: "北京"}, {Name: "Chengdu", Value: "ap-chengdu", Describe: "成都"}, {Name: "Chongqing", Value: "ap-chongqing", Describe: "重庆"}, {Name: "GuangzhouOpen", Value: "ap-guangzhou-open", Describe: "广州Open"}, {Name: "Guangzhou", Value: "ap-guangzhou", Describe: "广州"}, {Name: "HongKong", Value: "ap-hongkong", Describe: "中国香港"}, {Name: "Mumbai", Value: "ap-mumbai", Describe: "孟买"}, {Name: "Seoul", Value: "ap-seoul", Describe: "首尔"}, {Name: "Shanghai", Value: "ap-shanghai", Describe: "上海"}, {Name: "Nanjing", Value: "ap-nanjing", Describe: "南京"}, {Name: "ShanghaiFSI", Value: "ap-shanghai-fsi", Describe: "上海金融"}, {Name: "ShenzhenFSI", Value: "ap-shenzhen-fsi", Describe: "深圳金融"}, {Name: "Singapore", Value: "ap-singapore", Describe: "新加坡"}, {Name: "Tokyo", Value: "ap-tokyo", Describe: "东京"}, {Name: "Frankfurt", Value: "eu-frankfurt", Describe: "法兰克福"}, {Name: "Moscow", Value: "eu-moscow", Describe: "莫斯科"}, {Name: "Ashburn", Value: "na-ashburn", Describe: "阿什本"}, {Name: "SiliconValley", Value: "na-siliconvalley", Describe: "硅谷"}, {Name: "Toronto", Value: "na-toronto", Describe: "多伦多"}, }测试成功,可以正常返回账号ID
=== RUN TestClient client_test.go:15: 100009615835 --- PASS: TestClient (0.20s) PASS初始化操作器
package cvm import ( "github.com/infraboard/mcube/logger" "github.com/infraboard/mcube/logger/zap" cvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312" ) // 与云商交换的操作器 type CVMOperator struct { Client *cvm.Client log logger.Logger account string } func NewCVMOperator(client *cvm.Client, account string) *CVMOperator { return &CVMOperator{ Client: client, log: zap.L().Named("provider.txyun.cvm"), account: account, } }获取的数据是json的格式,需要进行装换
func (o *CVMOperator) transferSet(items []*cvm.Instance) *host.HostSet { set := host.NewHostSet() for i := range items { set.Add(o.transferOne(items[i])) } return set } func (o *CVMOperator) transferOne(ins *cvm.Instance) *host.Host { r := host.NewDefaultHost() b := r.Resource.Meta b.CreateAt = utils.ParseDefaultSecondTime(utils.PtrStrV(ins.CreatedTime)) b.ResourceId = utils.PtrStrV(ins.InstanceId) b.SerialNumber = utils.PtrStrV(ins.Uuid) i := r.Resource.Spec i.Vendor = resource.VENDOR_TENCENT i.Region = o.client.GetRegion() i.Zone = utils.PtrStrV(ins.Placement.Zone) i.Owner = o.account i.ExpireAt = utils.ParseDefaultSecondTime(utils.PtrStrV(ins.ExpiredTime)) i.Type = utils.PtrStrV(ins.InstanceType) i.Name = utils.PtrStrV(ins.InstanceName) if ins.InternetAccessible != nil { i.BandWidth = int32(tea.Int64Value(ins.InternetAccessible.InternetMaxBandwidthOut)) } i.Cpu = int32(utils.PtrInt64(ins.CPU)) i.Memory = int32(utils.PtrInt64(ins.Memory)) pm := mapping.PrasePayMode(tea.StringValue(ins.InstanceChargeType)) r.Resource.Cost.PayMode = &pm r.Resource.Status.PublicIp = strings.Join(utils.SlicePtrStrv(ins.PublicIpAddresses), ",") r.Resource.Status.PrivateIp = strings.Join(utils.SlicePtrStrv(ins.PrivateIpAddresses), ",") r.Resource.Status.Phase = utils.PtrStrV(ins.InstanceState) r.Resource.Tags = transferTags(ins.Tags) r.Spec.OsName = utils.PtrStrV(ins.OsName) r.Spec.ImageId = utils.PtrStrV(ins.ImageId) return r } // 对transferOne方法中的tag进行转换 func transferTags(tags []*cvm.Tag) (ret []*resource.Tag) { for i := range tags { ret = append(ret, resource.NewGroupTag( utils.PtrStrV(tags[i].Key), utils.PtrStrV(tags[i].Value)), ) } return }补充工具,读取指针的值,对transferOne中转换的云商数据进行检验
package utils func PtrStrV(v *string) string { if v == nil { return "" } return *v } func PtrInt64(v *int64) int64 { if v == nil { return 0 } return *v } func PtrInt32(v *int32) int32 { if v == nil { return 0 } return *v } func PtrFloat64(v *float64) float64 { if v == nil { return 0 } return *v } func SlicePtrStrv(items []*string) []string { vs := []string{} for i := range items { v := PtrStrV(items[i]) if v != "" { vs = append(vs, v) } } return vs }对时间进行格式转换
import ( "strings" "time" "github.com/infraboard/mcube/logger/zap" ) const ( ISO8601_FORMAT = "2006-01-02T15:04:05Z" DEFAULT_TIME_MINITE_FORMAT = "2006-01-02T15:04Z" TIME_SECOND_FORMAT_MOD1 = "2006-01-02 15:04:05" ) func ParseDefaultSecondTime(t string) int64 { return ParseTime(ISO8601_FORMAT, t) } func ParseDefaultMiniteTime(t string) int64 { return ParseTime(DEFAULT_TIME_MINITE_FORMAT, t) } func ParseSecondMod1Time(t string) int64 { if t == "0000-00-00 00:00:00" { return 0 } return ParseTime(TIME_SECOND_FORMAT_MOD1, t) } func ParseTime(format, t string) int64 { t = strings.TrimSpace(t) if t == "" { return 0 } ts, err := time.Parse(format, t) if err != nil { zap.L().Errorf("parse time %s error, %s", t, err) return 0 } return ts.Unix() }对付费模式进行转换
var ( // CDB付费类型,可能的返回值:0-包年包月;1-按量计费 // 负载均衡实例的计费类型,PREPAID:包年包月,POSTPAID_BY_HOUR:按量计费 // 实例计费模式。取值范围:<br><li>`PREPAID`:表示预付费,即包年包月<br><li>`POSTPAID_BY_HOUR`:表示后付费,即按量计费<br><li>`CDHPAID`:`CDH`付费,即只对`CDH`计费,不对`CDH`上的实例计费。<br><li>`SPOTPAID`:表示竞价实例付费。 // 磁盘付费模式。取值范围:<br><li>PREPAID:预付费,即包年包月<br><li>POSTPAID_BY_HOUR:后付费,即按量计费。 // 弹性公网IP的网络计费模式。注意,传统账户类型账户的弹性公网IP没有网络计费模式属性,值为空。 // 注意:此字段可能返回 null,表示取不到有效值。 // 包括: // <li><strong>BANDWIDTH_PREPAID_BY_MONTH</strong></li> // <p style="padding-left: 30px;">表示包月带宽预付费。</p> // <li><strong>TRAFFIC_POSTPAID_BY_HOUR</strong></li> // <p style="padding-left: 30px;">表示按小时流量后付费。</p> // <li><strong>BANDWIDTH_POSTPAID_BY_HOUR</strong></li> // <p style="padding-left: 30px;">表示按小时带宽后付费。</p> // <li><strong>BANDWIDTH_PACKAGE</strong></li> // <p style="padding-left: 30px;">表示共享带宽包。</p> // 注意:此字段可能返回 null,表示取不到有效值。 // 订单付费模式:prePay 预付费 postPay后付费 riPay预留实例 PAY_TYPE_STATUS_MAP = map[string]resource.PayMode{ "包年包月": resource.PayMode_PRE_PAY, "0": resource.PayMode_PRE_PAY, "PREPAID": resource.PayMode_PRE_PAY, "prePay": resource.PayMode_PRE_PAY, "1": resource.PayMode_POST_PAY, "POSTPAID_BY_HOUR": resource.PayMode_POST_PAY, "postPay": resource.PayMode_POST_PAY, "SPOTPAID": resource.PayMode_POST_PAY, "按量计费": resource.PayMode_POST_PAY, "riPay": resource.PayMode_RESERVED_PAY, } ) func PrasePayMode(s string) resource.PayMode { if v, ok := PAY_TYPE_STATUS_MAP[s]; ok { return v } return resource.PayMode_PRE_PAY }补充查询云商资源的具体实现
// 查询cvm列表(只拉去一页), 查询完成后转换为标准的Host对象 // https://console.cloud.tencent.com/api/explorer?Product=cvm&Version=2017-03-12&Action=DescribeInstances func (o *CVMOperator) QueryCvm(ctx context.Context, req *cvm.DescribeInstancesRequest) (*host.HostSet, error) { // 1.调用cvm客户端 查询cvm实例列表 // 返回的resp是一个DescribeInstancesResponse的实例,与请求对象对应 response, err := o.client.DescribeInstances(req) if _, ok := err.(*errors.TencentCloudSDKError); ok { return nil, err } // 2.转换为cdmb Host对象 set := o.transferSet(response.Response.InstanceSet) return set, nil }单页cvm资源查询,先加载依赖
package cvm_test import ( "context" "fmt" "gitee.com/wendao365/mycmdb/apps/host" "gitee.com/wendao365/mycmdb/provider/tx/connectivity" "gitee.com/wendao365/mycmdb/provider/tx/cvm" "github.com/infraboard/mcube/logger/zap" "testing" ) var ( op *cvm.CVMOperator ctx = context.Background() ) func init() { zap.DevelopmentSetup() req := new(connectivity.TencentCloudClient) ac, err := req.Account() if err != nil { panic(err) } op = cvm.NewCVMOperator(req.CvmClient(), ac) } func TestQueryCvm(t *testing.T) { req := cvm.NewDescribeInstancesRequest() set, err := op.QueryCvm(ctx, req) if err != nil { t.Fatal(err) } t.Log(set) }测试结果
=== RUN TestQueryCvm operator_test.go:36: items:{resource:{meta:{resource_id:"ins-rbcz77vt" create_at:1604505186 serial_number:"5332f8eb-65c4-4612-ae94-3e2b8b6591be"} spec:{vendor:TENCENT region:"ap-shanghai" zone:"ap-shanghai-2" owner:"100009615835" name:"zhiqi_cloud" type:"S4.SMALL2" expire_at:1699545186 cpu:1 memory:2 band_width:1} cost:{pay_mode:PRE_PAY} status:{phase:"RUNNING" public_ip:"182.254.217.98" private_ip:"172.17.0.6"}} spec:{os_name:"CentOS 7.8 64bit" image_id:"img-3la7wgnt"}} --- PASS: TestQueryCvm (0.81s)分页判断采取是否有下一页的方式来判断, 比如请求的当前页是20条,表示还有下一页, 0-19条代表没有下一页。
构建分页请求,带上默认的分页参数
// 带分页功能的查询 // 要把所有资源全部同步下来, 需要客户端拉起所有页面的实例数据 // 我们需要实现一个带分页查询功能的一个客户端, 这个客户端能把所有的页面都查询完 // 云商由速率限制? 比如每秒请求10页数据, 可以超过速率限制 // pager里面是需要控制 云商接口的访问频率 func (o *CVMOperator) PageCvmQuery() *CvmPager { return NewCvmPager(o, &CvmPagerRequest{PageSize: 20, PageNumber: 1}) }使用令牌桶进行限流,算法描述:
假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中(每秒会有r个令牌放入桶中);假设桶中最多可以存放b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃;服务请求时,尝试从桶里面获取一个token, 可以里面返回失败,或者直到有可用token放入具体实现可以直接调包
type CvmPagerRequest struct { PageSize int64 PageNumber int64 } func (req *CvmPagerRequest) Offset() int64 { return (req.PageNumber - 1) * req.PageSize } // 跟web界面逻辑一页 // 1. 从第一页开始查询, 100 // 2. 一直点Next(下一页)? 什么时候就结束了,没有下一页了喃? // 2.1 req.page_number+1 // // 3. // // 3.1. 根据Total 和当前已经的offset进行比较, 需要云商返回的Total要准确 // 3.2. 根据当前页是否满了, 比如一页20 0-19(代表没有下一页) type CvmPager struct { op *CVMOperator req *cvm.DescribeInstancesRequest log logger.Logger tb *tokenbucket.Bucket hasNext bool } func NewCvmPager(op *CVMOperator, args *CvmPagerRequest) *CvmPager { req := cvm.NewDescribeInstancesRequest() req.Limit = &args.PageSize offset := args.Offset() req.Offset = &offset return &CvmPager{ // 默认是由下一页的 hasNext: true, op: op, req: req, log: zap.L().Named("pager.cvm"), tb: tokenbucket.NewBucketWithRate(0.1, 1), } } // 判断是否由下一页 func (c *CvmPager) Next() bool { if c.hasNext { c.tb.Wait(1) return true } // 等待知道有多个令牌可用,直接取出 // time.Sleep(1 * time.Second) return false } // 请求该页的数据 func (c *CvmPager) Scan(ctx context.Context, set *host.HostSet) error { resp, err := c.op.QueryCvm(ctx, c.req) if err != nil { return err } set.Items = resp.Items // 修正Next的结果,tea是把指针读成值 c.log.Debugf("resp length: %d, page size: %d", resp.Length(), tea.Int64Value(c.req.Limit)) if resp.Length() < int(tea.Int64Value(c.req.Limit)) { c.hasNext = false } else { // 调整到下一页 offset := tea.Int64Value(c.req.Offset) + tea.Int64Value(c.req.Limit) c.req.Offset = &offset c.hasNext = true } return nil }测试结果,测试成功,会显示查询多少页,多少条
=== RUN TestPageCvmQuery 2023-01-26 18:21:12 DEBUG [pager.cvm] cvm/cvm_pagger.go:77 resp length: 1, page size: 20 items:{resource:{meta:{resource_id:"ins-rbcz77vt" create_at:1604505186 serial_number:"5332f8eb-65c4-4612-ae94-3e2b8b6591be"} spec:{vendor:TENCENT region:"ap-shanghai" zone:"ap-shanghai-2" owner:"100009615835" name:"zhiqi_cloud" type:"S4.SMALL2" expire_at:1699545186 cpu:1 memory:2 band_width:1} cost:{pay_mode:PRE_PAY} status:{phase:"RUNNING" public_ip:"182.254.217.98" private_ip:"172.17.0.6"}} spec:{os_name:"CentOS 7.8 64bit" image_id:"img-3la7wgnt"}} --- PASS: TestPageCvmQuery (0.51s) PASS相关标签: