2013年5月21日 星期二

Collecting rsyslog log using Apache Flume and HBase

rsyslog

rsyslog server (0a908186.cht.local)

CentOS 6.4

修改設定檔

$ vim /etc/rsyslog.conf
$ModLoad imuxsock.so
$ModLoad imklog.so
$ModLoad imtcp.so
$InputTCPServerRun 514
$template DailyPerHostLogs,"/var/log/rsyslog/%fromhost%/%$year%%$month%%$day%.log"
*.* -?DailyPerHostLogs;
$ vim /etc/sysconfig/rsyslog
SYSLOGD_OPTIONS="-c 5"

重新啟動

$ service rsyslog restart

當client送出訊息後檢視記錄

$ tail -f /var/log/rsyslog/d23d6ee0.cht.local/20130515.log
2013-05-15T23:41:17.461288+08:00 d23d6ee0.cht.local d23d6ee0@Wed May 15 23:41:19 CST 2013
2013-05-17T09:06:10+08:00 d23d6ee0 su: pam_unix(su-l:session): session opened for user hdfs by root(uid=0)
2013-05-17T09:06:16+08:00 d23d6ee0 su: pam_unix(su-l:session): session closed for user hdfs

rsyslog client

CentOS 6.4
修改設定檔
$ vim /etc/rsyslog.conf
#$ModLoad imtcp.so
authpriv.*       @@0a908186.cht.local:514
$WorkDirectory /var/lib/rsyslog # where to place spool files
$ActionQueueFileName fwdRule1 # unique name prefix for spool files
$ActionQueueMaxDiskSpace 1g   # 1gb space limit (use as much as possible)
$ActionQueueSaveOnShutdown on # save messages to disk on shutdown
$ActionQueueType LinkedList   # run asynchronously
$ActionResumeRetryCount -1    # infinite retries if host is down
重新啟動
$ service rsyslog restart

送出測試訊息

$ echo "`hostname`@`date`"|nc -t d241288b 514

測試帳號登入登出記錄

$ su - hdfs
$ exit

Flume

CDH3u5, Flume 1.2.0.24

flume installation

$ sudo yum install flume-ng
Name        : flume-ng
Arch        : noarch
Version     : 1.2.0+24.43
Release     : 1
$ sudo yum install flume-ng-agent
Name        : flume-ng-agent
Arch        : noarch
Version     : 1.2.0+24.43
Release     : 1

file location

Resource
Location
Notes
Config Directory
/etc/flume-ng/conf
Config File
/etc/flume-ng/conf/flume.conf
This config will be picked-up by the flume agent startup script.
Template of User Customizable Config File
/etc/flume-ng/conf/flume-conf.properties.template
Contains a sample config. To use this config you should copy this file onto /etc/flume-ng/conf/flume.conf
Template of User Customizable environment file
/etc/flume-ng/conf/flume-env.sh.template
If you modify this file, copy it to /etc/flume-ng/conf/flume-env.sh
Daemon Log Directory
/var/log/flume-ng
Default Flume Home
/usr/lib/flume-ng
Provided by RPMS and DEBS
Flume Agent startup script
/etc/init.d/flume-ng-agent
Provided by RPMS and DEBS
Recommended tar.gz Flume Home
/usr/local/lib/flume-ng
Recommended but installation dependent
Flume Wrapper Script
/usr/bin/flume-ng
Called by the Flume Agent startup script

flume configuration

修改設定檔

$ vim /etc/flume-ng/conf/example.conf
# example.conf: A single-node Flume configuration


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1


# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = ${hostname}
a1.sources.r1.port = 44444


# Describe the sink
a1.sinks.k1.type = logger


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

重新啟動

$ sudo -u hdfs flume-ng agent --conf-file /etc/flume-ng/conf/example.conf --name a1 -Dflume.root.logger=INFO,console &
...
13/05/16 00:05:32 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/X.X.X.X:5140]

送出測試訊息

$ telnet ${hostname} 44444
telnet> Hello World

檢視訊息

13/05/15 23:23:54 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 0D             Hello World. }

修改設定檔

# name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = org.apache.flume.source.SyslogTcpSource
a1.sources.r1.port = 514
a1.sources.r1.host = 0a908189.cht.local
# interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = timestamp
# sink
a1.sinks.k1.type = FILE_ROLL
a1.sinks.k1.sink.directory = /var/log/flume
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

重新啟動

$ nohup flume-ng agent --conf-file /etc/flume-ng/conf/rsyslog-log.conf --name a1 -Dflume.root.logger=INFO,consoleclient > rsyslog-log.log 2>&1 &

檢視訊息

tail -f /var/log/flume/1369622697744-1
sshd[1742]: Accepted publickey for root from X.X.X.X port 59503 ssh2
sshd[1742]: pam_unix(sshd:session): session opened for user root by (uid=0)
sshd[1742]: Received disconnect from X.X.X.X: 11: disconnected by user
sshd[1742]: pam_unix(sshd:session): session closed for user root
sshd[1751]: Accepted publickey for root from X.X.X.X port 59505 ssh2
sshd[1751]: pam_unix(sshd:session): session opened for user root by (uid=0)
sshd[1751]: Received disconnect from X.X.X.X: 11: disconnected by user
sshd[1751]: pam_unix(sshd:session): session closed for user root

修改設定檔

$ vim /etc/flume-ng/conf/rsyslog-hdfs.conf
# name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = org.apache.flume.source.SyslogTcpSource
a1.sources.r1.port = 514
a1.sources.r1.host = ${hostname}
# sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


# sink
a1.sinks.k1.type = hdfs
# directory path
a1.sinks.k1.hdfs.path = /flume/events/%Y%m%d
# the filename will be {%Prefix}.{%s}
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileSuffix = log
# -- Compression codec. one of following : gzip, bzip2, lzo, snappy
# hdfs.codeC = gzip
# fileType: currently SequenceFile, DataStream or CompressedStream
#(1)DataStream will not compress output file and please don't set codeC
#(2)CompressedStream requires set hdfs.codeC with an available codeC
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
a1.sinks.k1.hdfs.rollInterval = 600
# File size to trigger roll, in bytes (0: never roll based on file size)
a1.sinks.k1.hdfs.rollSize = 0
#Number of events written to file before it rolled (0 = never roll based on number of events)
a1.sinks.k1.hdfs.rollCount = 10000
# number of events written to file before it flushed to HDFS
a1.sinks.k1.hdfs.batchSize = 10000
a1.sinks.k1.hdfs.txnEventMax = 40000
# Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
a1.sinks.k1.hdfs.threadsPoolSize = 100
# Number of threads per HDFS sink for scheduling timed file rolling
a1.sinks.k1.hdfs.rollTimerPoolSize = 1
a1.sinks.sink1.hdfs.maxOpenFiles=50
a1.sinks.sink1.hdfs.appendTimeout = 10000
a1.sinks.sink1.hdfs.callTimeout = 10000
# hdfs.kerberosPrin--cipal Kerberos user principal for accessing secure HDFS
# hdfs.kerberosKey--tab Kerberos keytab for accessing secure HDFS
# hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
# hdfs.roundValue1 Rounded down to the highest multiple of this (in the unit configured using
# hdfs.roundUnit), less than current time.
# hdfs.roundUnit second The unit of the round down value - second, minute or hour.
# serializer TEXT Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
# serializer.*


# channel
# Use a channel which buffers events to a file
# -- The component type name, needs to be FILE.
agent1.channels.channel1.type = FILE
# checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
# dataDirs ~/.flume/file-channel/data The directory where log files will be stored
# The maximum size of transaction supported by the channel
agent1.channels.channel1.transactionCapacity = 1000000
# Amount of time (in millis) between checkpoints
agent1.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file
agent1.channels.channel1.maxFileSize = 2146435071
# Maximum capacity of the channel
agent1.channels.channel1.capacity 10000000
#keep-alive 3 Amount of time (in sec) to wait for a put operation
#write-timeout 3 Amount of time (in sec) to wait for a write operation
$ sudo -u hdfs hadoop fs -mkdir /flume
$ sudo -u hdfs hadoop fs -chmod 777 /flume

重新啟動

$ nohup flume-ng agent --conf-file /etc/flume-ng/conf/rsyslog-hdfs.conf --name a1 -Dflume.root.logger=INFO,consoleclient > rsyslog-hdfs.log 2>&1 &

修改設定檔

$ vim /etc/flume-ng/conf/rsyslog-hbase.conf
# name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = org.apache.flume.source.SyslogTcpSource
a1.sources.r1.port = 514
a1.sources.r1.host = ${hostname}
# interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = agent
a1.sources.r1.interceptors.i2.type = timestamp
# sink
#Use the AsyncHBaseSink
a1.sinks.k1.type =
org.apache.flume.sink.hbase.AsyncHBaseSink
#Use the HBaseSink
#a1.sinks.k1.type = org.apache.flume.sink.hbase.HBaseSink
a1.sinks.k1.table = t1
a1.sinks.k1.columnFamily =
fam1
a1.sinks.k1.batchSize = 5000
#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
a1.sinks.k1.serializer = o
rg.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
#Use the SimpleHbaseEventSerializer that comes with Flume
#a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
a1.sinks.k1.serializer.incrementRow =
iRow
a1.sinks.k1.serializer.incrementColumn = iCol
a1.sinks.k1.serializer.rowPrefix = rlog-
a1.sinks.k1.serializer.suffix = timestamp
a1.sinks.k1.serializer.payloadColumn= body
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

建立Hbase table

$ su - hdfs; hbase shell
hbase> create 't1', {NAME => 'fam1'}
hbase> put ‘t1’,

重新啟動

$ nohup flume-ng agent --conf-file /etc/flume-ng/conf/rsyslog-hbase.conf --name a1 -Dflume.root.logger=INFO,consoleclient >rsyslog-hbase.log 2>&1 &

查詢紀錄

20130520 10:05
hbase> get 't1', 'iRow'
COLUMN                CELL
fam1:iCol            timestamp=1369015910345, value=\x00\x00\x00\x00\x00@\x90\x
                     BB
hbase> count 't1'
Current count: 58000, row: evKey1369012288346
58955 row(s) in 1.9290 seconds
hbase> scan 't1', {STARTROW => 'evKey1369012288346'}
...
May 26 13:26:37 d241288b unix_chkpwd[17982]: password check failed for user (root)
May 26 13:26:37 d241288b sshd[17980]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=61.164.126.60  user=root
May 26 13:26:39 d241288b sshd[17980]: Failed password for root from 61.164.126.60 port 33720 ssh2
...
evKey1369015944037   column=fam1:evtBody, timestamp=1369015955264, value=sshd[2
                     8017]: input_userauth_request: invalid user fluffy
evKey1369015944038   column=fam1:evtBody, timestamp=1369015955337, value=sshd[2
                     8048]: Invalid user library from 200.73.76.20
iRow                 column=fam1:iCol, timestamp=1369015955340, value=\x00\x00\
                     x00\x00\x00@\x94?
983 row(s) in 0.7260 seconds

201210 [Flume-user] syslog source - sinks without datetime/hostname

org.apache.flume.source.SyslogTcpSource
   private SyslogUtils syslogUtils = new SyslogUtils();
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
     ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
     while (buff.readable()) {
       Event e = syslogUtils.extractEvent(buff);
       if (e == null) {
         logger.debug("Parsed partial event, event will be generated when " +
             "rest of the event is received.");
         continue;
       }
       try {
         getChannelProcessor().processEvent(e);
         counterGroup.incrementAndGet("events.success");
       } catch (ChannelException ex) {
         counterGroup.incrementAndGet("events.dropped");
         logger.error("Error writting to channel, event dropped", ex);
       }
     }


   }
 }
org.apache.flume.source.SyslogUtils
 private String timeStamp = null;
 private String hostName = null;
 private String msgBody = null;
 private void formatHeaders() {
    ...
       } else if (grp == SYSLOG_HOSTNAME_POS) {
         hostName = value;
       } else if (grp == SYSLOG_BODY_POS) {
         msgBody = value;
       }
    ...
 }
 Event buildEvent() {
   byte[] body;
   Map headers = new HashMap();
   ...
   if ((timeStamp != null) && timeStamp.length() > 0) {
     headers.put("timestamp", timeStamp);
   }
   if ((hostName != null) && (hostName.length() > 0)) {
     headers.put("host", hostName);
   }
   ...
   return EventBuilder.withBody(body, headers);
 }
 public Event extractEvent(ChannelBuffer in){
    ...
       case DATA:
         // TCP syslog entries are separated by '\n'
         if (b == '\n') {
           e = buildEvent();
           doneReading = true;
         } else {
           baos.write(b);
         }
 }
org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
 private byte[] payload;
 public List getActions() {
   List actions = new ArrayList();
   if(payloadColumn != null){
     byte[] rowKey;
     try {
       switch (keyType) {
         case TS:
           rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
           break;
         case TSNANO:
           rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
           break;
         case RANDOM:
           rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
           break;
         default:
           rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
           break;
       }
       PutRequest putRequest =  new PutRequest(table, rowKey, cf,
           payloadColumn, payload);
#PutRequest(byte[] table, byte[] key, byte[] family, byte[][] qualifiers, byte[][] values)
       actions.add(putRequest);
     } catch (Exception e){
       throw new FlumeException("Could not get row key!", e);
     }
   }
   return actions;
 }
 public void setEvent(Event event) {
    #可以將event header + body以JSON形式進行serialization
    String QUOTE = "\"";
    // this.payload = event.getBody();
    // 20130521, leo
    /*
     {"header":[
     {"h1":"v1"},
     {"h2":"v2"}
     ],
     "body":"b1"
     }
     */
    StringBuffer stb = new StringBuffer("{"+QUOTE+"header"+QUOTE+":[");
    
    for (Map.Entry entry : event.getHeaders().entrySet()) {
       stb.append("{"+QUOTE+entry.getKey()+QUOTE+":"+QUOTE+entry.getValue()+QUOTE+"},");
    }
    stb.deleteCharAt(stb.length()-1);
    stb.append("],");
    stb.append(QUOTE+"body"+QUOTE+":"+QUOTE+new String(event.getBody())+QUOTE+"}");
    this.payload = stb.toString().getBytes();
    #或是將event儲存,然後在getActions()建立multiple columns PutRequest
    #PutRequest API
    #http://tsunanet.net/~tsuna/asynchbase/1.2.0/
 }
org.apache.flume.sink.hbase.AsyncHBaseSink
private HBaseClient client;
public Status process() throws EventDeliveryException {
   ...
   try {
     txn = channel.getTransaction();
     txn.begin();
     for (; i < batchSize; i++) {
       Event event = channel.take();
       if (event == null) {
         status = Status.BACKOFF;
         if (i == 0) {
           sinkCounter.incrementBatchEmptyCount();
         } else {
           sinkCounter.incrementBatchUnderflowCount();
         }
         break;
       } else {
         serializer.setEvent(event);
         List actions = serializer.getActions();
         List increments = serializer.getIncrements();
         callbacksExpected.addAndGet(actions.size() + increments.size());


         for (PutRequest action : actions) {
           client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
         }
         for (AtomicIncrementRequest increment : increments) {
           client.atomicIncrement(increment).addCallbacks(
                   incrementSuccessCallback, incrementFailureCallback);
         }
       }
     }
   } catch (Throwable e) {
     this.handleTransactionFailure(txn);
     this.checkIfChannelExceptionAndThrow(e);
   }
  ...
}

自訂HBase內容 [github]

設計

  • row key 加入 source host, e.g.: ${a1.sinks.k1.serializer.rowPrefix}${a1.sinks.k1.serializer.suffix}${srchost}
  • 將org.apache.flume.Event的Header以multiple column寫入

開發

  • library dependency (flume-ng-1.2.0-cdh3u5/lib/)
    • asynchbase-1.2.0.jar
      • package org.hbase.async
    • hbase-0.90.6-cdh3u5.jar
    • hadoop-core-0.20.2-cdh3u5.jar
  • original source [Cloudera archive]
    • flume-ng-hbase-sink-1.2.0-cdh3u5.jar
  • 0.1.0
    • 將syslog header以JSON方式寫入HBASE
  • 0.2.0
    • 將syslog header以個別qualifier/value寫入HBASE 
    • ROWKEY加入syslog source hostname
      • 範例: rlog-1369638572251-0a90818a

佈署


  • 上傳修改後flume-ng-hbase-sink-0.2.0.jar到/usr/lib/flume-ng/lib, 並將原先檔案改名為flume-ng-hbase-sink-1.2.0-cdh3u5.jar
  • 重新啟動 rsyslog-hbase agent

測試

20130527 16:20
hbase> count ‘t1’
5 row(s) in 0.2770 seconds
hbase> scan ‘t1’
...
rlog-1369642795121-d column=fam1:Facility, timestamp=1369642795401, value=80
23d6ee0
rlog-1369642795121-d column=fam1:Severity, timestamp=1369642795398, value=6
23d6ee0
rlog-1369642795121-d column=fam1:agent, timestamp=1369642795402, value=X.X.X
23d6ee0              .X
rlog-1369642795121-d column=fam1:body, timestamp=1369642795399, value=sshd[1604
23d6ee0              6]: pam_unix(sshd:session): session closed for user root
rlog-1369642795121-d column=fam1:host, timestamp=1369642795400, value=d23d6ee0
23d6ee0
rlog-1369642795121-d column=fam1:timestamp, timestamp=1369642795397, value=1369
23d6ee0              642795121

20130529 14:07

hbase> scan ‘t1’
rlog-0a90818c-136980 column=fam1:Facility, timestamp=1369806994258, value=80
 6988677
 rlog-0a90818c-136980 column=fam1:Severity, timestamp=1369806994254, value=6
 6988677
 rlog-0a90818c-136980 column=fam1:agent, timestamp=1369806994260, value=10.144.1
 6988677              29.137
 rlog-0a90818c-136980 column=fam1:body, timestamp=1369806994261, value=sshd[2974
 6988677              7]: Received disconnect from 10.144.129.137: 11: disconnec
                      ted by user
 rlog-0a90818c-136980 column=fam1:host, timestamp=1369806994256, value=0a90818c
 6988677
 rlog-0a90818c-136980 column=fam1:timestamp, timestamp=1369806994237, value=1369
 6988677              806983743
 rlog-0a90818c-136980 column=fam1:Facility, timestamp=1369806994261, value=80
 6988678
 rlog-0a90818c-136980 column=fam1:Severity, timestamp=1369806994255, value=6
 6988678
 rlog-0a90818c-136980 column=fam1:agent, timestamp=1369806994262, value=10.144.1
 6988678              29.137
 rlog-0a90818c-136980 column=fam1:body, timestamp=1369806994257, value=sshd[2974
 6988678              7]: pam_unix(sshd:session): session closed for user root
 rlog-0a90818c-136980 column=fam1:host, timestamp=1369806994259, value=0a90818c
 6988678
 rlog-0a90818c-136980 column=fam1:timestamp, timestamp=1369806994239, value=1369
 6988678              806983746