본문 바로가기

Hadoop Ecosystem

flume install

[jonghee helped me to set up and run flume]

flume version : 1.4.0

port open 3333 at h001

An agent is started using a shell script called flume-ng which is located in the bin directory of the Flume distribution. You need to specify the agent name, the config directory, and the config file on the command line:

This configuration defines a single agent named agent01     agent01    as a source that listens for data on port 3333, a channel that buffers event data in memory, and a sink that logs event data to the console. The configuration file names the various components, then describes their types and configuration parameters. A given configuration file might define several named agents; when a given Flume process is launched a flag is passed telling it which named agent to manifest.

Given this configuration file, we can start Flume as follows:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

Note that in a full deployment we would typically include one more option: --conf=<conf-dir>. The <conf-dir> directory would include a shell script flume-env.sh and potentially a log4j properties file. In this example, we pass a Java option to force Flume to log to the console and we go without a custom environment script.

$ bin/flume-ng  agent  -n  $agent_name  -c  conf   -f  conf/flume-conf.properties.template


 ./flume/bin/flume-ng  agent  --name agent01   --conf $FLUME_HOME 

           --conf-file $FLUME_HOME/conf/flume.conf  -Dflume.root.logger=DEBUG,console

./flume/bin/flume-ng agent --conf $FLUME_HOME --conf-file $FLUME_HOME/conf/flume.conf --name agent02 -Dflume.root.logger=DEBUG,console

./flume/bin/flume-ng agent --conf $FLUME_HOME --conf-file $FLUME_HOME/conf/flume.conf --name agent03 -Dflume.root.logger=DEBUG,console

./flume/bin/flume-ng agent --conf $FLUME_HOME --conf-file $FLUME_HOME/conf/flume.conf --name agent04 -Dflume.root.logger=DEBUG,console

13/07/06 21:10:37 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting

13/07/06 21:10:37 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/hadoop/flume/conf/flume.conf

13/07/06 21:10:37 INFO conf.FlumeConfiguration: Processing:hdfs-Cluster1-sink

13/07/06 21:10:37 INFO conf.FlumeConfiguration: Processing:hdfs-Cluster1-sink

13/07/06 21:10:37 INFO conf.FlumeConfiguration: Processing:hdfs-Cluster1-sink

13/07/06 21:10:37 INFO conf.FlumeConfiguration: Added sinks: hdfs-Cluster1-sink Agent: agent01

13/07/06 21:10:37 INFO conf.FlumeConfiguration: Processing:hdfs-Cluster1-sink

13/07/06 21:10:37 INFO conf.FlumeConfiguration: Processing:hdfs-Cluster1-sink

13/07/06 21:10:37 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent01]

13/07/06 21:10:37 INFO node.AbstractConfigurationProvider: Creating channels

13/07/06 21:10:37 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory

13/07/06 21:10:37 INFO node.AbstractConfigurationProvider: Created channel memoryChannel

13/07/06 21:10:37 INFO source.DefaultSourceFactory: Creating instance of source avroGenSrc, type avro

13/07/06 21:10:37 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-Cluster1-sink, type: hdfs

13/07/06 21:10:37 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false

13/07/06 21:10:37 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [avroGenSrc, hdfs-Cluster1-sink]

13/07/06 21:10:37 INFO node.Application: Starting new configuration:{ sourceRunners:{avroGenSrc=EventDrivenSourceRunner: { source:Avro source avroGenSrc: { bindAddress: h001, port: 3333 } }} sinkRunners:{hdfs-Cluster1-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4a89fc68 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }

13/07/06 21:10:37 INFO node.Application: Starting Channel memoryChannel

13/07/06 21:10:38 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: memoryChannel, registered successfully.

13/07/06 21:10:38 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started

13/07/06 21:10:38 INFO node.Application: Starting Sink hdfs-Cluster1-sink

13/07/06 21:10:38 INFO node.Application: Starting Source avroGenSrc

13/07/06 21:10:38 INFO source.AvroSource: Starting Avro source avroGenSrc: { bindAddress: h001, port: 3333 }...

13/07/06 21:10:38 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SINK, name: hdfs-Cluster1-sink, registered successfully.

13/07/06 21:10:38 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs-Cluster1-sink started

13/07/06 21:10:38 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SOURCE, name: avroGenSrc, registered successfully.

13/07/06 21:10:38 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: avroGenSrc started

13/07/06 21:10:38 INFO source.AvroSource: Avro source avroGenSrc started.

13/07/06 21:11:06 INFO ipc.NettyServer: [id: 0x1e742dce, / => /] OPEN

13/07/06 21:11:06 INFO ipc.NettyServer: [id: 0x1e742dce, / => /] BOUND: /

13/07/06 21:11:06 INFO ipc.NettyServer: [id: 0x1e742dce, / => /] CONNECTED: /

13/07/06 21:11:08 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false

13/07/06 21:11:08 INFO hdfs.BucketWriter: Creating hdfs://h001:9000/user/flume/data/FlumeData.1373112668068.tmp

13/07/06 21:11:09 INFO hdfs.BucketWriter: Renaming hdfs://h001:9000/user/flume/data/FlumeData.1373112668068.tmp to hdfs://h001:9000/user/flume/data/FlumeData.1373112668068

13/07/06 21:11:09 INFO hdfs.BucketWriter: Creating hdfs://h001:9000/user/flume/data/FlumeData.1373112668069.tmp

13/07/06 21:11:28 INFO ipc.NettyServer: [id: 0x019a07ff, / => /] OPEN

13/07/06 21:11:28 INFO ipc.NettyServer: [id: 0x019a07ff, / => /] BOUND: /

13/07/06 21:11:28 INFO ipc.NettyServer: [id: 0x019a07ff, / => /] CONNECTED: /

13/07/06 21:11:28 INFO hdfs.BucketWriter: Renaming hdfs://h001:9000/user/flume/data/FlumeData.1373112668069.tmp to hdfs://h001:9000/user/flume/data/FlumeData.1373112668069

13/07/06 21:11:28 INFO hdfs.BucketWriter: Creating hdfs://h001:9000/user/flume/data/FlumeData.1373112668070.tmp

13/07/06 21:11:58 INFO ipc.NettyServer: [id: 0x4f94aaa1, / => /] OPEN

13/07/06 21:11:58 INFO ipc.NettyServer: [id: 0x4f94aaa1, / => /] BOUND: /

13/07/06 21:11:58 INFO ipc.NettyServer: [id: 0x4f94aaa1, / => /] CONNECTED: /

13/07/06 21:11:58 INFO hdfs.BucketWriter: Renaming hdfs://h001:9000/user/flume/data/FlumeData.1373112668070.tmp to hdfs://h001:9000/user/flume/data/FlumeData.1373112668070

13/07/06 21:12:01 INFO hdfs.BucketWriter: Creating hdfs://h001:9000/user/flume/data/FlumeData.1373112668071.tmp

13/07/06 21:12:02 INFO hdfs.BucketWriter: Renaming hdfs://h001:9000/user/flume/data/FlumeData.1373112668071.tmp to hdfs://h001:9000/user/flume/data/FlumeData.1373112668071

13/07/06 21:12:02 INFO hdfs.BucketWriter: Creating hdfs://h001:9000/user/flume/data/FlumeData.1373112668072.tmp

[ setting configuration for flume. ]

h001  - Master Agent that finally write it to HDFS.


agent01.sources = avroGenSrc

agent01.channels = memoryChannel

agent01.sinks = hdfs-Cluster1-sink

# For each one of the sources, the type is defined

agent01.sources.avroGenSrc.type = avro

agent01.sources.avroGenSrc.bind = h001

agent01.sources.avroGenSrc.port = 3333

# The channel can be defined as follows.

agent01.sources.avroGenSrc.channels = memoryChannel

# Each sink's type must be defined

agent01.sinks.hdfs-Cluster1-sink.type = hdfs

agent01.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://h001:9000/user/flume/data

agent01.sinks.hdfs-Cluster1-sink.rollInterval = 2

agent01.sinks.hdfs-Cluster1-sink.sink.batchSize = 100

#Specify the channel the sink should use

agent01.sinks.hdfs-Cluster1-sink.channel = memoryChannel

# Each channel's type is defined.

agent01.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent01.channels.memoryChannel.capacity = 100000

agent01.channels.memoryChannel.transactionCapacity = 10000

h002, 3, 4  flume.conf  - each agents

agent02.sources = execGenSrc

agent02.channels = memoryChannel

agent02.sinks = avroSink

# For each one of the sources, the type is defined

agent02.sources.execGenSrc.type = exec

agent02.sources.execGenSrc.command = tail -F /home/hadoop/hadoop/logs/hadoop-hadoop-tasktracker-h002.log

agent02.sources.execGenSrc.batchSize = 10

# The channel can be defined as follows.

agent02.sources.execGenSrc.channels = memoryChannel

# Each sink's type must be defined // set the next agent, like h001 master agent  in this case.

agent02.sinks.avroSink.type = avro

agent02.sinks.avroSink.hostname = h001

agent02.sinks.avroSink.port = 3333

agent02.sinks.avroSink.batch-size = 10

#Specify the channel the sink should use

agent02.sinks.avroSink.channel = memoryChannel

# Each channel's type is defined.

agent02.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent02.channels.memoryChannel.capacity = 100000

agent02.channels.memoryChannel.transactionCapacity = 10000

[ refered to https://cwiki.apache.org/confluence/display/FLUME/Getting+Started ]

This is a listing of the implemented sources, sinks, and channels at this time. Each plugin has its own optional and required configuration properties so please see the javadocs (for now).

ComponentTypeDescriptionImplementation Class
In-memory, fast, non-durable event transportMemoryChannel
ChannelfileA channel for reading, writing, mapping, and manipulating a fileFileChannel
ChanneljdbcJDBC-based, durable event transport (Derby-based)JDBCChannel
ChannelrecoverablememoryA durable channel implementation that uses the local file system for its storageRecoverableMemoryChannel
Channelorg.apache.flume.channel.PseudoTxnMemoryChannelMainly for testing purposes. Not meant for production use.PseudoTxnMemoryChannel
Channel(custom type as FQCN)Your own Channel impl.(custom FQCN)
SourceavroAvro Netty RPC event sourceAvroSource
SourceexecExecute a long-lived Unix process and read from stdoutExecSource
SourcenetcatNetcat style TCP event sourceNetcatSource
SourceseqMonotonically incrementing sequence generator event sourceSequenceGeneratorSource
Sourceorg.apache.flume.source.StressSourceMainly for testing purposes. Not meant for production use. Serves as a continuous source of events where each event has the same payload. The payload consists of some number of bytes (specified by size property, defaults to 500) where each byte has the signed value Byte.MAX_VALUE (0x7F, or 127). 
Sourcesyslogudp SyslogUDPSource
Sourceorg.apache.flume.source.avroLegacy.AvroLegacySource AvroLegacySource 
Sourceorg.apache.flume.source.thriftLegacy.ThriftLegacySource ThriftLegacySource 
Sourceorg.apache.flume.source.scribe.ScribeSource ScribeSource 
(custom type as FQCN) 
Your own Source impl. 
(custom FQCN) 
SinkhdfsWrites all events received to HDFS (with support for rolling, bucketing, HDFS-200 append, and more)HDFSEventSink
org.apache.flume.sink.hbase.HBaseSinkA simple sink that reads events from a channel and writes them to HBase.org.apache.flume.sink.hbase.HBaseSink
org.apache.flume.sink.hbase.AsyncHBaseSink org.apache.flume.sink.hbase.AsyncHBaseSink
SinkloggerLog events at INFO level via configured logging subsystem (log4j by default)LoggerSink
SinkavroSink that invokes a pre-defined Avro protocol method for all events it receives (when paired with an avro source, forms tiered collection)AvroSink
/dev/null for Flume - blackhole all events receivedNullSink
(custom type as FQCN) 
Your own Sink impl. 
(custom FQCN) 
(custom type) 
Your own ChannelSelector impl. 
(custom FQCN) 
Provides the ability to load-balance flow over multiple sinks.LoadBalancingSinkProcessor
(custom type as FQCN) 
Your own SinkProcessor impl.(custom FQCN) 
Interceptor$Builderhost HostInterceptor$Builder
Interceptor$Builder(custom type as FQCN)Your own Interceptor$Builder impl.(custom FQCN)
EventSerializerorg.apache.flume.sink.hbase.SimpleHbaseEventSerializer SimpleHbaseEventSerializer
EventSerializerorg.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer SimpleAsyncHbaseEventSerializer
EventSerializerorg.apache.flume.sink.hbase.RegexHbaseEventSerializer RegexHbaseEventSerializer
HbaseEventSerializerCustom implementation of serializer for HBaseSink. 
(custom type as FQCN)
Your own HbaseEventSerializer impl.(custom FQCN)
AsyncHbaseEventSerializerCustom implementation of serializer for AsyncHbase sink. 
(custom type as FQCN)
Your own AsyncHbaseEventSerializer impl.(custom FQCN)
EventSerializer$BuilderCustom implementation of serializer for all sinks except for HBaseSink and AsyncHBaseSink. 
(custom type as FQCN)
Your own EventSerializer$Builder impl.(custom FQCN)

The flume-ng executable lets you run a Flume NG agent or an Avro client which is useful for testing and experiments. No matter what, you'll need to specify a command (e.g. agent or avro-