rsyslog
rsyslog server (0a908186.cht.local)
CentOS 6.4
修改設定檔
$ vim /etc/rsyslog.conf
$ModLoad imuxsock.so
$ModLoad imklog.so
$ModLoad imtcp.so
$InputTCPServerRun 514
$template DailyPerHostLogs,"/var/log/rsyslog/%fromhost%/%$year%%$month%%$day%.log"
*.* -?DailyPerHostLogs;
|
$ vim /etc/sysconfig/rsyslog
SYSLOGD_OPTIONS="-c 5"
|
重新啟動
$ service rsyslog restart
當client送出訊息後檢視記錄
$ tail -f /var/log/rsyslog/d23d6ee0.cht.local/20130515.log
…
2013-05-15T23:41:17.461288+08:00 d23d6ee0.cht.local d23d6ee0@Wed May 15 23:41:19 CST 2013
…
2013-05-17T09:06:10+08:00 d23d6ee0 su: pam_unix(su-l:session): session opened for user hdfs by root(uid=0)
2013-05-17T09:06:16+08:00 d23d6ee0 su: pam_unix(su-l:session): session closed for user hdfs
|
rsyslog client
CentOS 6.4
修改設定檔
$ vim /etc/rsyslog.conf
#$ModLoad imtcp.so
authpriv.* @@0a908186.cht.local:514
$WorkDirectory /var/lib/rsyslog # where to place spool files
$ActionQueueFileName fwdRule1 # unique name prefix for spool files
$ActionQueueMaxDiskSpace 1g # 1gb space limit (use as much as possible)
$ActionQueueSaveOnShutdown on # save messages to disk on shutdown
$ActionQueueType LinkedList # run asynchronously
$ActionResumeRetryCount -1 # infinite retries if host is down
|
重新啟動
$ service rsyslog restart
送出測試訊息
$ echo "`hostname`@`date`"|nc -t d241288b 514
測試帳號登入登出記錄
$ su - hdfs
$ exit
Flume
CDH3u5, Flume 1.2.0.24
flume installation
$ sudo yum install flume-ng
Name : flume-ng
Arch : noarch
Version : 1.2.0+24.43
Release : 1
|
$ sudo yum install flume-ng-agent
Name : flume-ng-agent
Arch : noarch
Version : 1.2.0+24.43
Release : 1
|
file location
Resource
|
Location
|
Notes
|
Config Directory
|
/etc/flume-ng/conf
| |
Config File
|
/etc/flume-ng/conf/flume.conf
|
This config will be picked-up by the flume agent startup script.
|
Template of User Customizable Config File
|
/etc/flume-ng/conf/flume-conf.properties.template
|
Contains a sample config. To use this config you should copy this file onto /etc/flume-ng/conf/flume.conf
|
Template of User Customizable environment file
|
/etc/flume-ng/conf/flume-env.sh.template
|
If you modify this file, copy it to /etc/flume-ng/conf/flume-env.sh
|
Daemon Log Directory
|
/var/log/flume-ng
| |
Default Flume Home
|
/usr/lib/flume-ng
|
Provided by RPMS and DEBS
|
Flume Agent startup script
|
/etc/init.d/flume-ng-agent
|
Provided by RPMS and DEBS
|
Recommended tar.gz Flume Home
|
/usr/local/lib/flume-ng
|
Recommended but installation dependent
|
Flume Wrapper Script
|
/usr/bin/flume-ng
|
Called by the Flume Agent startup script
|
flume configuration
修改設定檔
$ vim /etc/flume-ng/conf/example.conf
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = ${hostname}
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
重新啟動
$ sudo -u hdfs flume-ng agent --conf-file /etc/flume-ng/conf/example.conf --name a1 -Dflume.root.logger=INFO,console &
...
13/05/16 00:05:32 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/X.X.X.X:5140]
|
送出測試訊息
$ telnet ${hostname} 44444
telnet> Hello World
檢視訊息
13/05/15 23:23:54 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 0D Hello World. }
|
修改設定檔
# name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = org.apache.flume.source.SyslogTcpSource
a1.sources.r1.port = 514
a1.sources.r1.host = 0a908189.cht.local
# interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = timestamp
# sink
a1.sinks.k1.type = FILE_ROLL
a1.sinks.k1.sink.directory = /var/log/flume
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
重新啟動
$ nohup flume-ng agent --conf-file /etc/flume-ng/conf/rsyslog-log.conf --name a1 -Dflume.root.logger=INFO,consoleclient > rsyslog-log.log 2>&1 &
檢視訊息
tail -f /var/log/flume/1369622697744-1
sshd[1742]: Accepted publickey for root from X.X.X.X port 59503 ssh2
sshd[1742]: pam_unix(sshd:session): session opened for user root by (uid=0)
sshd[1742]: Received disconnect from X.X.X.X: 11: disconnected by user
sshd[1742]: pam_unix(sshd:session): session closed for user root
sshd[1751]: Accepted publickey for root from X.X.X.X port 59505 ssh2
sshd[1751]: pam_unix(sshd:session): session opened for user root by (uid=0)
sshd[1751]: Received disconnect from X.X.X.X: 11: disconnected by user
sshd[1751]: pam_unix(sshd:session): session closed for user root
|
修改設定檔
$ vim /etc/flume-ng/conf/rsyslog-hdfs.conf
# name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = org.apache.flume.source.SyslogTcpSource
a1.sources.r1.port = 514
a1.sources.r1.host = ${hostname}
# sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
# sink
a1.sinks.k1.type = hdfs
# directory path
a1.sinks.k1.hdfs.path = /flume/events/%Y%m%d
# the filename will be {%Prefix}.{%s}
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileSuffix = log
# -- Compression codec. one of following : gzip, bzip2, lzo, snappy
# hdfs.codeC = gzip # fileType: currently SequenceFile, DataStream or CompressedStream #(1)DataStream will not compress output file and please don't set codeC #(2)CompressedStream requires set hdfs.codeC with an available codeC
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
a1.sinks.k1.hdfs.rollInterval = 600
# File size to trigger roll, in bytes (0: never roll based on file size)
a1.sinks.k1.hdfs.rollSize = 0
#Number of events written to file before it rolled (0 = never roll based on number of events)
a1.sinks.k1.hdfs.rollCount = 10000
# number of events written to file before it flushed to HDFS
a1.sinks.k1.hdfs.batchSize = 10000
a1.sinks.k1.hdfs.txnEventMax = 40000
# Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
a1.sinks.k1.hdfs.threadsPoolSize = 100
# Number of threads per HDFS sink for scheduling timed file rolling
a1.sinks.k1.hdfs.rollTimerPoolSize = 1
a1.sinks.sink1.hdfs.maxOpenFiles=50
a1.sinks.sink1.hdfs.appendTimeout = 10000
a1.sinks.sink1.hdfs.callTimeout = 10000
# hdfs.kerberosPrin--cipal Kerberos user principal for accessing secure HDFS
# hdfs.kerberosKey--tab Kerberos keytab for accessing secure HDFS # hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) # hdfs.roundValue1 Rounded down to the highest multiple of this (in the unit configured using # hdfs.roundUnit), less than current time. # hdfs.roundUnit second The unit of the round down value - second, minute or hour. # serializer TEXT Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of the EventSerializer.Builder interface. # serializer.* |
# channel
# Use a channel which buffers events to a file
# -- The component type name, needs to be FILE. agent1.channels.channel1.type = FILE # checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored # dataDirs ~/.flume/file-channel/data The directory where log files will be stored # The maximum size of transaction supported by the channel agent1.channels.channel1.transactionCapacity = 1000000 # Amount of time (in millis) between checkpoints agent1.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file
agent1.channels.channel1.maxFileSize = 2146435071 # Maximum capacity of the channel agent1.channels.channel1.capacity 10000000 #keep-alive 3 Amount of time (in sec) to wait for a put operation #write-timeout 3 Amount of time (in sec) to wait for a write operation |
$ sudo -u hdfs hadoop fs -mkdir /flume
$ sudo -u hdfs hadoop fs -chmod 777 /flume
重新啟動
$ nohup flume-ng agent --conf-file /etc/flume-ng/conf/rsyslog-hdfs.conf --name a1 -Dflume.root.logger=INFO,consoleclient > rsyslog-hdfs.log 2>&1 &
修改設定檔
$ vim /etc/flume-ng/conf/rsyslog-hbase.conf
# name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = org.apache.flume.source.SyslogTcpSource
a1.sources.r1.port = 514
a1.sources.r1.host = ${hostname}
# interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = agent
a1.sources.r1.interceptors.i2.type = timestamp
# sink
#Use the AsyncHBaseSink
a1.sinks.k1.type = org.apache.flume.sink.hbase.AsyncHBaseSink #Use the HBaseSink #a1.sinks.k1.type = org.apache.flume.sink.hbase.HBaseSink a1.sinks.k1.table = t1 a1.sinks.k1.columnFamily = fam1 a1.sinks.k1.batchSize = 5000 #Use the SimpleAsyncHbaseEventSerializer that comes with Flume a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer #Use the SimpleHbaseEventSerializer that comes with Flume #a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer a1.sinks.k1.serializer.incrementRow = iRow
a1.sinks.k1.serializer.incrementColumn = iCol
a1.sinks.k1.serializer.rowPrefix = rlog-
a1.sinks.k1.serializer.suffix = timestamp
a1.sinks.k1.serializer.payloadColumn= body
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
建立Hbase table
$ su - hdfs; hbase shell
hbase> create 't1', {NAME => 'fam1'}
hbase> put ‘t1’,
|
重新啟動
$ nohup flume-ng agent --conf-file /etc/flume-ng/conf/rsyslog-hbase.conf --name a1 -Dflume.root.logger=INFO,consoleclient >rsyslog-hbase.log 2>&1 &
查詢紀錄
20130520 10:05
hbase> get 't1', 'iRow'
COLUMN CELL
fam1:iCol timestamp=1369015910345, value=\x00\x00\x00\x00\x00@\x90\x
BB
hbase> count 't1'
Current count: 58000, row: evKey1369012288346
58955 row(s) in 1.9290 seconds
hbase> scan 't1', {STARTROW => 'evKey1369012288346'}
...
May 26 13:26:37 d241288b unix_chkpwd[17982]: password check failed for user (root)
May 26 13:26:37 d241288b sshd[17980]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=61.164.126.60 user=root
May 26 13:26:39 d241288b sshd[17980]: Failed password for root from 61.164.126.60 port 33720 ssh2
...
evKey1369015944037 column=fam1:evtBody, timestamp=1369015955264, value=sshd[2
8017]: input_userauth_request: invalid user fluffy
evKey1369015944038 column=fam1:evtBody, timestamp=1369015955337, value=sshd[2
8048]: Invalid user library from 200.73.76.20
iRow column=fam1:iCol, timestamp=1369015955340, value=\x00\x00\
x00\x00\x00@\x94?
983 row(s) in 0.7260 seconds
|
201210 [Flume-user] syslog source - sinks without datetime/hostname
org.apache.flume.source.SyslogTcpSource
private SyslogUtils syslogUtils = new SyslogUtils();
public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
while (buff.readable()) {
Event e = syslogUtils.extractEvent(buff);
if (e == null) {
logger.debug("Parsed partial event, event will be generated when " +
"rest of the event is received.");
continue;
}
try {
getChannelProcessor().processEvent(e);
counterGroup.incrementAndGet("events.success");
} catch (ChannelException ex) {
counterGroup.incrementAndGet("events.dropped");
logger.error("Error writting to channel, event dropped", ex);
}
}
}
}
|
org.apache.flume.source.SyslogUtils
private String timeStamp = null;
private String hostName = null;
private String msgBody = null;
private void formatHeaders() {
...
} else if (grp == SYSLOG_HOSTNAME_POS) {
hostName = value;
} else if (grp == SYSLOG_BODY_POS) {
msgBody = value;
}
...
}
Event buildEvent() {
byte[] body;
Map
...
if ((timeStamp != null) && timeStamp.length() > 0) {
headers.put("timestamp", timeStamp);
}
if ((hostName != null) && (hostName.length() > 0)) {
headers.put("host", hostName);
}
...
return EventBuilder.withBody(body, headers);
}
public Event extractEvent(ChannelBuffer in){
...
case DATA:
// TCP syslog entries are separated by '\n'
if (b == '\n') {
e = buildEvent();
doneReading = true;
} else {
baos.write(b);
}
}
|
org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
private byte[] payload;
public List
List
if(payloadColumn != null){
byte[] rowKey;
try {
switch (keyType) {
case TS:
rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
break;
case TSNANO:
rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
break;
case RANDOM:
rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
break;
default:
rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
break;
}
PutRequest putRequest = new PutRequest(table, rowKey, cf,
payloadColumn, payload);
#PutRequest(byte[] table, byte[] key, byte[] family, byte[][] qualifiers, byte[][] values)
actions.add(putRequest);
} catch (Exception e){
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
public void setEvent(Event event) {
#可以將event header + body以JSON形式進行serialization
String QUOTE = "\"";
// this.payload = event.getBody();
// 20130521, leo
/*
{"header":[
{"h1":"v1"},
{"h2":"v2"}
],
"body":"b1"
}
*/
StringBuffer stb = new StringBuffer("{"+QUOTE+"header"+QUOTE+":[");
for (Map.Entry
stb.append("{"+QUOTE+entry.getKey()+QUOTE+":"+QUOTE+entry.getValue()+QUOTE+"},");
}
stb.deleteCharAt(stb.length()-1);
stb.append("],");
stb.append(QUOTE+"body"+QUOTE+":"+QUOTE+new String(event.getBody())+QUOTE+"}");
this.payload = stb.toString().getBytes();
#或是將event儲存,然後在getActions()建立multiple columns PutRequest
#PutRequest API
#http://tsunanet.net/~tsuna/asynchbase/1.2.0/
}
|
org.apache.flume.sink.hbase.AsyncHBaseSink
private HBaseClient client;
public Status process() throws EventDeliveryException {
...
try {
txn = channel.getTransaction();
txn.begin();
for (; i < batchSize; i++) {
Event event = channel.take();
if (event == null) {
status = Status.BACKOFF;
if (i == 0) {
sinkCounter.incrementBatchEmptyCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
break;
} else {
serializer.setEvent(event);
List
List
callbacksExpected.addAndGet(actions.size() + increments.size());
for (PutRequest action : actions) {
client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
}
for (AtomicIncrementRequest increment : increments) {
client.atomicIncrement(increment).addCallbacks(
incrementSuccessCallback, incrementFailureCallback);
}
}
}
} catch (Throwable e) {
this.handleTransactionFailure(txn);
this.checkIfChannelExceptionAndThrow(e);
}
...
}
|
自訂HBase內容 [github]
設計
- row key 加入 source host, e.g.: ${a1.sinks.k1.serializer.rowPrefix}${a1.sinks.k1.serializer.suffix}${srchost}
- 將org.apache.flume.Event的Header以multiple column寫入
開發
- library dependency (flume-ng-1.2.0-cdh3u5/lib/)
- asynchbase-1.2.0.jar
- package org.hbase.async
- hbase-0.90.6-cdh3u5.jar
- hadoop-core-0.20.2-cdh3u5.jar
- original source [Cloudera archive]
- flume-ng-hbase-sink-1.2.0-cdh3u5.jar
- 0.1.0
- 將syslog header以JSON方式寫入HBASE
- 0.2.0
- 將syslog header以個別qualifier/value寫入HBASE
- [ref 200303 Introduction to Syslog Protocol]
- column family:qualifier 清單
- fam1:Facility
- fam1:Severity
- fam1:agent
- fam1:body
- fam1:host
- fam1:timestamp
- ROWKEY加入syslog source hostname
- 範例: rlog-1369638572251-0a90818a
佈署
- 上傳修改後flume-ng-hbase-sink-0.2.0.jar到/usr/lib/flume-ng/lib, 並將原先檔案改名為flume-ng-hbase-sink-1.2.0-cdh3u5.jar
- 重新啟動 rsyslog-hbase agent
測試
20130527 16:20
hbase> count ‘t1’
5 row(s) in 0.2770 seconds
hbase> scan ‘t1’
...
rlog-1369642795121-d column=fam1:Facility, timestamp=1369642795401, value=80
23d6ee0
rlog-1369642795121-d column=fam1:Severity, timestamp=1369642795398, value=6
23d6ee0
rlog-1369642795121-d column=fam1:agent, timestamp=1369642795402, value=X.X.X
23d6ee0 .X
rlog-1369642795121-d column=fam1:body, timestamp=1369642795399, value=sshd[1604
23d6ee0 6]: pam_unix(sshd:session): session closed for user root
rlog-1369642795121-d column=fam1:host, timestamp=1369642795400, value=d23d6ee0
23d6ee0
rlog-1369642795121-d column=fam1:timestamp, timestamp=1369642795397, value=1369
23d6ee0 642795121
|
20130529 14:07
hbase> scan ‘t1’
rlog-0a90818c-136980 column=fam1:Facility, timestamp=1369806994258, value=80
6988677
rlog-0a90818c-136980 column=fam1:Severity, timestamp=1369806994254, value=6
6988677
rlog-0a90818c-136980 column=fam1:agent, timestamp=1369806994260, value=10.144.1
6988677 29.137
rlog-0a90818c-136980 column=fam1:body, timestamp=1369806994261, value=sshd[2974
6988677 7]: Received disconnect from 10.144.129.137: 11: disconnec
ted by user
rlog-0a90818c-136980 column=fam1:host, timestamp=1369806994256, value=0a90818c
6988677
rlog-0a90818c-136980 column=fam1:timestamp, timestamp=1369806994237, value=1369
6988677 806983743
rlog-0a90818c-136980 column=fam1:Facility, timestamp=1369806994261, value=80
6988678
rlog-0a90818c-136980 column=fam1:Severity, timestamp=1369806994255, value=6
6988678
rlog-0a90818c-136980 column=fam1:agent, timestamp=1369806994262, value=10.144.1
6988678 29.137
rlog-0a90818c-136980 column=fam1:body, timestamp=1369806994257, value=sshd[2974
6988678 7]: pam_unix(sshd:session): session closed for user root
rlog-0a90818c-136980 column=fam1:host, timestamp=1369806994259, value=0a90818c
6988678
rlog-0a90818c-136980 column=fam1:timestamp, timestamp=1369806994239, value=1369
6988678 806983746
|