日誌收集系統應該說是到達一定規模的公司的標配了,一個能滿足業務需求、運維成本低、穩定的日誌收集系統對於運維的同學和日誌使用方的同學都是非常nice的。然而這時理想中的日誌收集系統,現實往往不是這樣的...本篇的主要內容是:首先吐槽一下公司以前的日誌收集和上傳;介紹新的實時日誌收集系統架構;用go語言實現。澄清一下,並不是用go語言實現全部,比如用到卡夫卡肯定不能重寫一個kafka吧……

logagent所有代碼已上傳到github:https://github.com/zingp/logagent。

1 老系統吐槽


我司以前的日誌收集系統概述如下:

日誌收集的頻率有每小時收集一次、每5分鐘收集一次、實時收集三種。大部分情況是每小時收集上傳一次。

(1) 每5分鐘上傳一次和每小時上傳一次的情況是這樣的:

每臺機器上都需要部署一個日誌收集agengt,部署一個日誌上傳agent,每臺機器都需要掛載hadoop集羣的客戶端。

日誌收集agent負責切割日誌,上傳agent整點的時候啓動利用hadoop客戶端,將切割好的前1小時或前5分鐘日誌打包上傳到hadoop集羣。

(2) 實時傳輸的情況是這樣的

每臺機器上部署另一個agent,該agent實時收集日誌傳輸到kafka。

看到這裏你可能都看不下去了,這麼複雜臃腫費勁的日誌收集系統是怎麼設計出來的?額...先辯解一下,這套系統有4年以上的歷史了,當時的解決方案確實有限。辯解完之後還是得吐槽一下系統存在的問題:

(1) 首先部署在每臺機器上的agent沒有做統一的配置入口,需要根據不同業務到不同機器上配置,運維成本太大;十臺機器也就罷了,問題是現在有幾萬臺機器,幾千個服務。

(2) 最無語的是針對不同的hadoop集羣,需要掛載多個hadoop客戶端,也就是存在一臺機器上部署幾個hadoop客戶端的情況。運維成本太大……

(3) 沒做限流,整點的時候傳輸壓力變大。某些機器有很多日誌,一到整點壓力就上來了。無圖無真相,我們來看下:

CPU:看綠色的線條

海量日誌實時收集系統架構設計與go語言實現


負載:

海量日誌實時收集系統架構設計與go語言實現


網卡:

海量日誌實時收集系統架構設計與go語言實現


這組機器比較典型(這就是前文說的有多個hadoop客戶端的情況),截圖是凌晨至上午的時間段,還未到真正的高峯期。不過總體上可看出整點的壓力是明顯比非正點高很多的,已經到了不能忍的地步。

(4) 省略n條吐槽……

2 新系統架構


首先日誌收集大可不必在客戶端分爲1小時、5分鐘、實時這幾種頻率,只需要實時一種就能滿足前面三種需求。

其次可以砍掉在機器上掛載hadoop客戶端,放在其他地方做日誌上傳hadoop流程。

第三,做統一的配置管理系統,提供友好的web界面,用戶只需要在web界面上配置一組service需要收集的日誌,便可通知該組service下的所有機器上的日誌收集agent。

第四,流量削峯。應該說實時收集可以避免舊系統整點負載過大情況,但依舊應該做限流功能,防止高峯期agent過度消耗資源影響業務。

第五,日誌補傳...

實際上公司有的部門在用flume做日誌收集,但覺得太重。經過一段時間調研和結合自身業務特點,利用開源軟件在適當做些開發會比較好。go應該擅長做這個事,而且方便運維。好了,附上架構圖。

海量日誌實時收集系統架構設計與go語言實現


將用go實現logagent,Web,transfer這個三個部分。

logagent主要負責按照配置實時收集日誌發送到kafka,此外還需watch etcd中的配置,如改變,需要熱更新。

web部分主要用於更新etcd中的配置,etcd已提供接口,我們只需要集成到資源管理系統或CMDB系統的管理界面中去即可。

transfer 做的是消費kafka隊列中的日誌,發送到es/hadoop/storm中去。

3 實現logagent


3.1 配置設計


首先思考下logagent的配置文件內容:

etcd_addr = 10.134.123.183:2379 # etcd 地址
etcd_timeout = 5 # 連接etcd超時時間
etcd_watch_key = /logagent/%s/logconfig # etcd key 格式
kafka_addr = 10.134.123.183:9092 # 卡夫卡地址
thread_num = 4 # 線程數
log = ./log/logagent.log # agent的日誌文件
level = debug # 日誌級別
# 監聽哪些日誌,日誌限流大小,發送到卡夫卡的哪個topic 這個部分可以放到etcd中去。


如上所說,監聽哪些日誌,日誌限流大小,發送到卡夫卡的哪個topic 這個部分可以放到etcd中去。etcd中存儲的value格式設計如下:

`[
{
"service":"test_service",
"log_path": "/search/nginx/logs/ping-android.shouji.sogou.com_access_log", "topic": "nginx_log",
"send_rate": 1000
},
{
"service":"srv.android.shouji.sogou.com",
"log_path": "/search/nginx/logs/srv.android.shouji.sogou.com_access_log","topic": "nginx_log",
"send_rate": 2000
}
]`
- "service":"服務名稱",
- "log_path": "應該監聽的日誌文件",
- "topic": "kfk topic",
- "send_rate": "日誌條數限制"  


其實可以將更多的配置放入etcd中,根據自身業務情況可自行定義,本次就做如此設計,接下來可以寫解析配置文件的代碼了。

config.go

package main
import (
"fmt"
"github.com/astaxie/beego/config"
)
type AppConfig struct {
EtcdAddr string
EtcdTimeOut int
EtcdWatchKey string
KafkaAddr string
ThreadNum int
LogFile string
LogLevel string
}
var appConf = &AppConfig{}
func initConfig(file string) (err error) {
conf, err := config.NewConfig("ini", file)
if err != nil {
fmt.Println("new config failed, err:", err)
return
}
appConf.EtcdAddr = conf.String("etcd_addr")
appConf.EtcdTimeOut = conf.DefaultInt("etcd_timeout", 5)
appConf.EtcdWatchKey = conf.String("etcd_watch_key")
appConf.KafkaAddr = conf.String("kafka_addr")
appConf.ThreadNum = conf.DefaultInt("thread_num", 4)
appConf.LogFile = conf.String("log")
appConf.LogLevel = conf.String("level")
return
} 


代碼主要定義了一個AppConf結構體,然後讀取配置文件,存放到結構體中。

此外,還有部分配置在etcd中,需要做兩件事,第一次啓動程序時將配置從etcd拉取下來;然後啓動一個協程去watch etcd中的配置是否更改,如果更改需要拉取並更新到內存中。代碼如下:

etcd.go:

package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/astaxie/beego/logs"
client "github.com/coreos/etcd/clientv3"
)
var (
confChan = make(chan string, 10)
cli *client.Client
waitGroup sync.WaitGroup
)
func initEtcd(addr []string, keyFormat string, timeout time.Duration) (err error) {
// init a global var cli and can not close
cli, err = client.New(client.Config{
Endpoints: addr,
DialTimeout: timeout,
})
if err != nil {
fmt.Println("connect etcd error:", err)
return
}
logs.Debug("init etcd success")
// defer cli.Close() //can not close
var etcdKeys []string
ips, err := getLocalIP()
if err != nil {
fmt.Println("get local ip error:", err)
return
}
for _, ip := range ips {
key := fmt.Sprintf(keyFormat, ip)
etcdKeys = append(etcdKeys, key)
}
// first, pull conf from etcd
for _, key := range etcdKeys {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, key)
cancel()
if err != nil {
fmt.Println("get etcd key failed, error:", err)
continue
}
for _, ev := range resp.Kvs {
// return result is not string
confChan fmt.Printf("etcd key = %s , etcd value = %s", ev.Key, ev.Value)
}
}
waitGroup.Add(1)
// second, start a goroutine to watch etcd
go etcdWatch(etcdKeys)
return
}
// watch etcd
func etcdWatch(keys []string) {
defer waitGroup.Done()
var watchChans []client.WatchChan
for _, key := range keys {
rch := cli.Watch(context.Background(), key)
watchChans = append(watchChans, rch)
}
for {
for _, watchC := range watchChans {
select {
case wresp := for _, ev := range wresp.Events {
confChan logs.Debug("etcd key = %s , etcd value = %s", ev.Kv.Key, ev.Kv.Value)
}
default:
}
}
time.Sleep(time.Second)
}
}
//GetEtcdConfChan is func get etcd conf add to chan
func GetEtcdConfChan() chan string {
return confChan
}  


其中,有一個比較個性化的設計,就是一臺主機對應的etcd 中的key我們設置成/logagent/本機ip/logconfig的格式,因此還需要一個獲取本機IP的功能,注意一臺機器可能存在多個IP。

ip.go:

package main
import (
"fmt"
"net"
)
// var a slice for ip addr
var ipArray []string
func getLocalIP() (ips []string, err error) {
ifaces, err := net.Interfaces()
if err != nil {
fmt.Println("get ip interfaces error:", err)
return
}
for _, i := range ifaces {
addrs, errRet := i.Addrs()
if errRet != nil {
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
if ip.IsGlobalUnicast() {
ips = append(ips, ip.String())
}
}
}
}
return
}


3.2 初始化kafka


初始化kafka很簡單,就是創建kafka實例,提供發送日誌功能。只不過發送是併發的。

package main
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
)
var kafkaSend = &KafkaSend{}
type Message struct {
line string
topic string
}
type KafkaSend struct {
client sarama.SyncProducer
lineChan chan *Message
}
func initKafka(kafkaAddr string, threadNum int) (err error) {
kafkaSend, err = NewKafkaSend(kafkaAddr, threadNum)
return
}
// NewKafkaSend is
func NewKafkaSend(kafkaAddr string, threadNum int) (kafka *KafkaSend, err error) {
kafka = &KafkaSend{
lineChan: make(chan *Message, 10000),
}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // wait kafka ack
config.Producer.Partitioner = sarama.NewRandomPartitioner // random partition
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{kafkaAddr}, config)
if err != nil {
logs.Error("init kafka client err: %v", err)
return
}
kafka.client = client
for i := 0; i < threadNum; i++ {
fmt.Println("start to send kfk")
waitGroup.Add(1)
go kafka.sendMsgToKfk()
}
return
}
func (k *KafkaSend) sendMsgToKfk() {
defer waitGroup.Done()
for v := range k.lineChan {
msg := &sarama.ProducerMessage{}
msg.Topic = v.topic
msg.Value = sarama.StringEncoder(v.line)
_, _, err := k.client.SendMessage(msg)
if err != nil {
logs.Error("send massage to kafka error: %v", err)
return
}
}
}
func (k *KafkaSend) addMessage(line string, topic string) (err error) {
k.lineChan return
}


3.3 實時讀取日誌,發送到kafka


用到第三方包:"github.com/hpcloud/tail"。將每個監聽的日誌,都抽象成一個對象。

package main
import (
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/astaxie/beego/logs"
"github.com/hpcloud/tail"
)
// TailObj is TailMgr's instance
type TailObj struct {
tail *tail.Tail
offset int64
logConf LogConfig
secLimit *SecondLimit
exitChan chan bool
}
var tailMgr *TailMgr
//TailMgr to manage tailObj
type TailMgr struct {
tailObjMap map[string]*TailObj
lock sync.Mutex
}
// NewTailMgr init TailMgr obj
func NewTailMgr() *TailMgr {
return &TailMgr{
tailObjMap: make(map[string]*TailObj, 16),
}
}
//AddLogFile to Add tail obj
func (t *TailMgr) AddLogFile(conf LogConfig) (err error) {
t.lock.Lock()
defer t.lock.Unlock()
_, ok := t.tailObjMap[conf.LogPath]
if ok {
err = fmt.Errorf("duplicate filename:%s", conf.LogPath)
return
}
tail, err := tail.TailFile(conf.LogPath, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // read to tail
MustExist: false, //file does not exist, it does not return an error
Poll: true,
})
if err != nil {
fmt.Println("tail file err:", err)
return
}
tailObj := &TailObj{
tail: tail,
offset: 0,
logConf: conf,
secLimit: NewSecondLimit(int32(conf.SendRate)),
exitChan: make(chan bool, 1),
}
t.tailObjMap[conf.LogPath] = tailObj
waitGroup.Add(1)
go tailObj.readLog()
return
}
func (t *TailMgr) reloadConfig(logConfArr []LogConfig) (err error) {
for _, conf := range logConfArr {
tailObj, ok := t.tailObjMap[conf.LogPath]
if !ok {
err = t.AddLogFile(conf)
if err != nil {
logs.Error("add log file failed:%v", err)
continue
}
continue
}
tailObj.logConf = conf
tailObj.secLimit.limit = int32(conf.SendRate)
t.tailObjMap[conf.LogPath] = tailObj
}
for key, tailObj := range t.tailObjMap {
var found = false
for _, newValue := range logConfArr {
if key == newValue.LogPath {
found = true
break
}
}
if found == false {
logs.Warn("log path :%s is remove", key)
tailObj.exitChan delete(t.tailObjMap, key)
}
}
return
}
// Process hava two func get new log conf and reload conf
func (t *TailMgr) Process() {
for conf := range GetEtcdConfChan() {
logs.Debug("log conf: %v", conf)
var logConfArr []LogConfig
err := json.Unmarshal([]byte(conf), &logConfArr)
if err != nil {
logs.Error("unmarshal failed, err: %v conf :%s", err, conf)
continue
}
err = t.reloadConfig(logConfArr)
if err != nil {
logs.Error("reload config from etcd failed: %v", err)
continue
}
logs.Debug("reload config from etcd success")
}
}
func (t *TailObj) readLog() {
for line := range t.tail.Lines {
if line.Err != nil {
logs.Error("read line error:%v ", line.Err)
continue
}
lineStr := strings.TrimSpace(line.Text)
if len(lineStr) == 0 || lineStr[0] == '\n' {
continue
}
kafkaSend.addMessage(line.Text, t.logConf.Topic)
t.secLimit.Add(1)
t.secLimit.Wait()
select {
case logs.Warn("tail obj is exited: config:", t.logConf)
return
default:
}
}
waitGroup.Done()
}
func runServer() {
tailMgr = NewTailMgr()
tailMgr.Process()
waitGroup.Wait()
} 


此處設計了一個限流功能,邏輯大概如下:設置閾值A,如閾值爲1000條,如果這秒鐘已經發送1000條,那麼這一秒剩下的時間就sleep。limit.go代碼如下:

package main
import (
"sync/atomic"
"time"
"github.com/astaxie/beego/logs"
)
// SecondLimit to limit num in one second
type SecondLimit struct {
unixSecond int64
curCount int32
limit int32
}
// NewSecondLimit to init a SecondLimit obj
func NewSecondLimit(limit int32) *SecondLimit {
secLimit := &SecondLimit{
unixSecond: time.Now().Unix(),
curCount: 0,
limit: limit,
}
return secLimit
}
// Add is func to
func (s *SecondLimit) Add(count int) {
sec := time.Now().Unix()
if sec == s.unixSecond {
atomic.AddInt32(&s.curCount, int32(count))
return
}
atomic.StoreInt64(&s.unixSecond, sec)
atomic.StoreInt32(&s.curCount, int32(count))
}
// Wait to limit num
func (s *SecondLimit) Wait() bool {
for {
sec := time.Now().Unix()
if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount >= s.limit {
time.Sleep(time.Millisecond)
logs.Debug("limit is runing, limit: %d s.curCount:%d", s.limit, s.curCount)
continue
}
if sec != atomic.LoadInt64(&s.unixSecond) {
atomic.StoreInt64(&s.unixSecond, sec)
atomic.StoreInt32(&s.curCount, 0)
}
logs.Debug("limit is exited")
return false
}
}


此外,寫日誌的代碼非主要代碼,這裏就不介紹了。所有代碼均上傳到github上,如有興趣可前去clone,地址已經在文章開頭處給出。

出處:http://www.cnblogs.com/zingp/p/9365010.html
相關文章