open-falcon-agent源码学习

最近学习falcon,看了源码和极客学院的视频解析,画了调用结构、关系,对主要的代码进行了注释

代码地址:https://github.com/beyondskyw...

标签(空格分隔): falcon go

监控数据

  1. 机器性能指标:cpu,mem,网卡,磁盘……

  2. 业务监控

  3. 开源软件状态:Nginx,Redis,MySQL

  4. snmp采集网络设备指标

设计原理

  1. 自发现采集值

  2. 不同类型数据采集分不同goroutine

  3. 进程和端口通过用户配置进行监控

配置文件

  1. hostname和ip默认留空,agent自动探测

  2. hbs和transfer都是配置其rpc地址

  3. collector网卡采集前缀

  4. ignore为true时取消上报

组织结构

  1. cron:间隔执行的代码,即定时任务

  2. funcs:信息采集

  3. g:全局数据结构

  4. http:简单的dashboard的server,获取单机监控指标数据

  5. plugins:插件处理机制

  6. public:静态资源文件

心跳机制

  1. 了解agent、plugin版本信息,方便升级

  2. 获取监听的进程和端口

  3. 获取本机执行的插件列表

与HBS、Transfer交互

调用关系

代码解读

  1. main入口
go cron.InitDataHistory()// 上报本机状态cron.ReportAgentStatus()// 同步插件cron.SyncMinePlugins()// 同步监控端口、路径、进程和URLcron.SyncBuiltinMetrics()// 后门调试agent,允许执行shell指令的ip列表cron.SyncTrustableIps()// 开始数据次采集cron.Collect()// 启动dashboard servergo http.Start()
  1. ReportAgentStatus:汇报agent本身状态
// 判断hbs配置是否正常,正常则上报agent状态if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {    // 根据配置的interval间隔上报信息    go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second)}func reportAgentStatus(interval time.Duration) {    for {        // 获取hostname, 出错则错误赋值给hostname        hostname, err := g.Hostname()        if err != nil {            hostname = fmt.Sprintf("error:%s", err.Error())        }        // 请求发送信息        req := model.AgentReportRequest{            Hostname:      hostname,            IP:            g.IP(),            AgentVersion:  g.VERSION,            // 通过shell指令获取plugin版本,能否go实现            PluginVersion: g.GetCurrPluginVersion(),        }        var resp model.SimpleRpcResponse        // 调用rpc接口        err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)        if err != nil || resp.Code != 0 {            log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)        }        time.Sleep(interval)    }}
  1. SyncMinePlugins:同步插件
func syncMinePlugins() {    var (        timestamp  int64 = -1        pluginDirs []string    )    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second    for {        time.Sleep(duration)        hostname, err := g.Hostname()        if err != nil {            continue        }        req := model.AgentHeartbeatRequest{            Hostname: hostname,        }        var resp model.AgentPluginsResponse        // 调用rpc接口,返回plugin        err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)        if err != nil {            log.Println("ERROR:", err)            continue        }        // 保证时间顺序正确        if resp.Timestamp 1. SyncBuiltinMetrics:同步内置metric,包括端口、目录和进程信息

func syncBuiltinMetrics() {
var timestamp int64 = -1
var checksum string = "nil"

duration := time.Duration(g.Config().Heartbeat.Interval) * time.Secondfor {    time.Sleep(duration)    // 监控端口、目录大小、进程    var ports = []int64{}    var paths = []string{}    var procs = make(map[string]map[int]string)    var urls = make(map[string]string)    hostname, err := g.Hostname()    if err != nil {        continue    }    req := model.AgentHeartbeatRequest{        Hostname: hostname,        Checksum: checksum,    }    var resp model.BuiltinMetricResponse    err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)    if err != nil {        log.Println("ERROR:", err)        continue    }    if resp.Timestamp 1. SyncTrustableIps:同步可信IP列表

请求获取远程访问执行shell命令的IP白名单,在通过http/run.go调用shell命令是会判断请求IP是否可信

func syncTrustableIps() {    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second    for {        time.Sleep(duration)        var ips string        err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)        if err != nil {            log.Println("ERROR: call Agent.TrustableIps fail", err)            continue        }        // 设置到本地可信IP列表        g.SetTrustableIps(ips)    }}
  1. FuncsAndInterval:拆分不同的采集函数集,方便通过不同goroutine运行
// 间隔internal时间执行fs中的函数type FuncsAndInterval struct {    Fs       []func() []*model.MetricValue    Interval int}var Mappers []FuncsAndInterval// 根据调用指令类型和是否容易被挂起而分类(通过不同的goroutine去执行,避免相互之间的影响)func BuildMappers() {    interval := g.Config().Transfer.Interval    Mappers = []FuncsAndInterval{        FuncsAndInterval{            Fs: []func() []*model.MetricValue{                AgentMetrics,                CpuMetrics,                NetMetrics,                KernelMetrics,                LoadAvgMetrics,                MemMetrics,                DiskIOMetrics,                IOStatsMetrics,                NetstatMetrics,                ProcMetrics,                UdpMetrics,            },            Interval: interval,        },        // 容易出问题        FuncsAndInterval{            Fs: []func() []*model.MetricValue{                DeviceMetrics,            },            Interval: interval,        },        // 调用相同指令        FuncsAndInterval{            Fs: []func() []*model.MetricValue{                PortMetrics,                SocketStatSummaryMetrics,            },            Interval: interval,        },        FuncsAndInterval{            Fs: []func() []*model.MetricValue{                DuMetrics,            },            Interval: interval,        },        FuncsAndInterval{            Fs: []func() []*model.MetricValue{                UrlMetrics,            },            Interval: interval,        },    }}
  1. Colleet:配置信息读取,读取Mapper中的FuncsAndInterval,根据func调用采集函数,采集所有信息(并非先过滤采集项),从所有采集到的数据中过滤ignore的项,并上报到transfer。
func Collect() {    // 配置信息判断    if !g.Config().Transfer.Enabled {        return    }    if len(g.Config().Transfer.Addrs) == 0 {        return    }    // 读取mapper中的FuncsAndInterval集,并通过不同的goroutine运行    for _, v := range funcs.Mappers {        go collect(int64(v.Interval), v.Fs)    }}// 间隔采集信息func collect(sec int64, fns []func() []*model.MetricValue) {    // 启动断续器,间隔执行    t := time.NewTicker(time.Second * time.Duration(sec)).C    for {        1. 采集信息结构

type MetricValue struct {
Endpoint string // 主机名
Metric string // 信息标识cpu.idle、mem.memtotal等
Value interface{} // 采集结果
Step int64 // 该项上报间隔
Type string // GAUGE或COUNTER
Tags string // 配置报警策略
Timestamp int64 // 此次上报时间
}

1. 采集信息组成metricValue结构

func NewMetricValue(metric string, val interface{}, dataType string, tags ...string) *model.MetricValue {
mv := model.MetricValue{
Metric: metric,
Value: val,
Type: dataType,
}

size := len(tags)if size > 0 {    mv.Tags = strings.Join(tags, ",")}return &mv

}
// 原值类型
func GaugeValue(metric string, val interface{}, tags ...string) *model.MetricValue {
return NewMetricValue(metric, val, "GAUGE", tags...)
}

// 计数器类型
func CounterValue(metric string, val interface{}, tags ...string) *model.MetricValue {
return NewMetricValue(metric, val, "COUNTER", tags...)
}

1. rpc组件

// 简单封装rpc.Cilent
type SingleConnRpcClient struct {
sync.Mutex
rpcClient *rpc.Client
RpcServer string
Timeout time.Duration
}

// 关闭rpc
func (this *SingleConnRpcClient) close() {
if this.rpcClient != nil {
this.rpcClient.Close()
this.rpcClient = nil
}
}

// 保证rpc存在,为空则重新创建, 如果server宕机, 死循环????
func (this *SingleConnRpcClient) insureConn() {
if this.rpcClient != nil {
return
}

var err errorvar retry int = 1for {    if this.rpcClient != nil {        return    }    // 根据timeout和server地址去连接rpc的server    this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)    if err == nil {        return    }    log.Printf("dial %s fail: %v", this.RpcServer, err)    if retry > 6 {        retry = 1    }    time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)    retry++}

}

// rpc client调用hbs函数
func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
// 加锁保证一个agent只与server有一个连接,保证性能
this.Lock()
defer this.Unlock()
// 保证rpc连接可用
this.insureConn()

timeout := time.Duration(50 * time.Second)done := make(chan error)go func() {    err := this.rpcClient.Call(method, args, reply)    done  %v", this.rpcClient, this.RpcServer)    this.close()case err := 1. Transfer部件
// 定义transfer的rpcClient对应Map, transferClients读写锁var (    TransferClientsLock *sync.RWMutex                   = new(sync.RWMutex)    TransferClients     map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{})// 发送数据到随机的transferfunc SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {    rand.Seed(time.Now().UnixNano())    // 随机transferClient发送数据,直到发送成功    for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {        addr := Config().Transfer.Addrs[i]        if _, ok := TransferClients[addr]; !ok {            initTransferClient(addr)        }        if updateMetrics(addr, metrics, resp) {            break        }    }}// 初始化addr对应的transferClientfunc initTransferClient(addr string) {    TransferClientsLock.Lock()    defer TransferClientsLock.Unlock()    TransferClients[addr] = &SingleConnRpcClient{        RpcServer: addr,        Timeout:   time.Duration(Config().Transfer.Timeout) * time.Millisecond,    }}// 调用rpc接口发送metricfunc updateMetrics(addr string, metrics []*model.MetricValue, resp *model.TransferResponse) bool {    TransferClientsLock.RLock()    defer TransferClientsLock.RUnlock()    err := TransferClients[addr].Call("Transfer.Update", metrics, resp)    if err != nil {        log.Println("call Transfer.Update fail", addr, err)        return false    }    return true}
  1. 采集插件同步
// 插件信息: 路径、修改时间、运行周期(来自plugin插件)type Plugin struct {    FilePath string    MTime    int64    Cycle    int}// 插件map和调度器mapvar (    Plugins              = make(map[string]*Plugin)    PluginsWithScheduler = make(map[string]*PluginScheduler))// 删除不需要的pluginfunc DelNoUsePlugins(newPlugins map[string]*Plugin) {    for currKey, currPlugin := range Plugins {        newPlugin, ok := newPlugins[currKey]        if !ok || currPlugin.MTime != newPlugin.MTime {            deletePlugin(currKey)        }    }}// 添加同步时增加的pluginfunc AddNewPlugins(newPlugins map[string]*Plugin) {    for fpath, newPlugin := range newPlugins {        // 去除重复插件        if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {            continue        }        // 为新添加的插件新建调度器        Plugins[fpath] = newPlugin        sch := NewPluginScheduler(newPlugin)        PluginsWithScheduler[fpath] = sch        // 启动plugin调度        sch.Schedule()    }}func ClearAllPlugins() {    for k := range Plugins {        deletePlugin(k)    }}func deletePlugin(key string) {    v, ok := PluginsWithScheduler[key]    if ok {        // 暂停调度plugin        v.Stop()        delete(PluginsWithScheduler, key)    }    delete(Plugins, key)}
  1. 插件调度策略
// 持续间隔执行plugintype PluginScheduler struct {    Ticker *time.Ticker    Plugin *Plugin    Quit   chan struct{}}// 根据plugin创建新的schedulefunc NewPluginScheduler(p *Plugin) *PluginScheduler {    scheduler := PluginScheduler{Plugin: p}    scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second)    scheduler.Quit = make(chan struct{})    return &scheduler}// plugin调度,间隔执行PluginRun,除非收到quit消息func (this *PluginScheduler) Schedule() {    go func() {        for {            select {            case <-this.Ticker.C:                PluginRun(this.Plugin)            case <-this.Quit:                this.Ticker.Stop()                return            }        }    }()}// 停止plugin调度func (this *PluginScheduler) Stop() {    close(this.Quit)}// 执行插件,读取插件运行返回数据并上报transferfunc PluginRun(plugin *Plugin) {    timeout := plugin.Cycle*1000 - 500    fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)    if !file.IsExist(fpath) {        log.Println("no such plugin:", fpath)        return    }    debug := g.Config().Debug    if debug {        log.Println(fpath, "running...")    }    cmd := exec.Command(fpath)    var stdout bytes.Buffer    cmd.Stdout = &stdout    var stderr bytes.Buffer    cmd.Stderr = &stderr    cmd.Start()    err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)    errStr := stderr.String()    if errStr != "" {        logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")        if _, err = file.WriteString(logFile, errStr); err != nil {            log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)        }    }    if isTimeout {        // has be killed        if err == nil && debug {            log.Println("[INFO] timeout and kill process", fpath, "successfully")        }        if err != nil {            log.Println("[ERROR] kill process", fpath, "occur error:", err)        }        return    }    if err != nil {        log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)        return    }    // exec successfully    data := stdout.Bytes()    if len(data) == 0 {        if debug {            log.Println("[DEBUG] stdout of", fpath, "is blank")        }        return    }    var metrics []*model.MetricValue    err = json.Unmarshal(data, &metrics)    if err != nil {        log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())        return    }    g.SendToTransfer(metrics)}

关键字:Golang, go语言


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部