Skip to content

Commit b783bcf

Browse files
committed
initial commit
0 parents  commit b783bcf

12 files changed

+667
-0
lines changed

.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
.classpath
2+
.project
3+
.settings
4+
*~
5+
/target

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
mvn package assembly:assembly

pom.xml

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<groupId>io.covert</groupId>
5+
<artifactId>hadoop-binary-analysis</artifactId>
6+
<packaging>jar</packaging>
7+
<version>1.0-SNAPSHOT</version>
8+
<name>hadoop-binary-analysis</name>
9+
<url>http://maven.apache.org</url>
10+
<dependencies>
11+
<dependency>
12+
<groupId>junit</groupId>
13+
<artifactId>junit</artifactId>
14+
<version>3.8.1</version>
15+
<scope>test</scope>
16+
</dependency>
17+
18+
<dependency>
19+
<groupId>org.apache.hadoop</groupId>
20+
<artifactId>hadoop-core</artifactId>
21+
<version>0.20.2</version>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.codehaus.jackson</groupId>
25+
<artifactId>jackson-mapper-asl</artifactId>
26+
<version>1.9.3</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>com.googlecode.json-simple</groupId>
30+
<artifactId>json-simple</artifactId>
31+
<version>1.1</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.commons</groupId>
35+
<artifactId>commons-exec</artifactId>
36+
<version>1.1</version>
37+
</dependency>
38+
39+
<dependency>
40+
<groupId>org.slf4j</groupId>
41+
<artifactId>slf4j-log4j12</artifactId>
42+
<version>1.5.8</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.slf4j</groupId>
46+
<artifactId>slf4j-api</artifactId>
47+
<version>1.5.8</version>
48+
</dependency>
49+
50+
51+
</dependencies>
52+
53+
<build>
54+
<plugins>
55+
<plugin>
56+
<groupId>org.apache.maven.plugins</groupId>
57+
<artifactId>maven-compiler-plugin</artifactId>
58+
<configuration>
59+
<source>1.6</source>
60+
<target>1.6</target>
61+
</configuration>
62+
</plugin>
63+
<plugin>
64+
<artifactId>maven-assembly-plugin</artifactId>
65+
<configuration>
66+
<finalName>${project.name}-${project.version}</finalName>
67+
<appendAssemblyId>true</appendAssemblyId>
68+
<descriptors>
69+
<descriptor>src/main/assembly.xml</descriptor>
70+
</descriptors>
71+
</configuration>
72+
</plugin>
73+
</plugins>
74+
</build>
75+
76+
</project>

src/main/assembly.xml

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<assembly
2+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
5+
<id>job</id>
6+
<formats>
7+
<format>jar</format>
8+
</formats>
9+
<includeBaseDirectory>false</includeBaseDirectory>
10+
<dependencySets>
11+
<dependencySet>
12+
<unpack>false</unpack>
13+
<scope>runtime</scope>
14+
<outputDirectory>lib</outputDirectory>
15+
<excludes>
16+
<exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
17+
</excludes>
18+
</dependencySet>
19+
<dependencySet>
20+
<unpack>false</unpack>
21+
<scope>system</scope>
22+
<outputDirectory>lib</outputDirectory>
23+
<excludes>
24+
<exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
25+
</excludes>
26+
</dependencySet>
27+
</dependencySets>
28+
<fileSets>
29+
<fileSet>
30+
<directory>${basedir}/target/classes</directory>
31+
<outputDirectory>/</outputDirectory>
32+
<excludes>
33+
<exclude>*.jar</exclude>
34+
</excludes>
35+
</fileSet>
36+
</fileSets>
37+
</assembly>
38+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.covert.binary.analysis;
2+
3+
import org.apache.hadoop.conf.Configuration;
4+
import org.apache.hadoop.conf.Configured;
5+
import org.apache.hadoop.fs.Path;
6+
import org.apache.hadoop.io.BytesWritable;
7+
import org.apache.hadoop.io.Text;
8+
import org.apache.hadoop.mapreduce.Job;
9+
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
10+
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
11+
import org.apache.hadoop.util.Tool;
12+
import org.apache.hadoop.util.ToolRunner;
13+
14+
public class BinaryAnalysisJob extends Configured implements Tool {
15+
16+
private static void usage(String msg)
17+
{
18+
System.err.println("Usage: hadoop jar JARFILE.jar "+BinaryAnalysisJob.class.getName()+" <inDir> <outDir>");
19+
System.err.println(" inDir - HDFS input dir");
20+
System.err.println(" outDir - HDFS output dir");
21+
System.exit(-1);
22+
}
23+
24+
@Override
25+
public int run(String[] args) throws Exception {
26+
27+
if(args.length != 2)
28+
{
29+
usage("");
30+
}
31+
32+
String inDir = args[0];
33+
String outDir = args[1];
34+
35+
Configuration conf = getConf();
36+
conf.set("binary.analysis.output.parser", SimpleOutputParser.class.getName());
37+
conf.set("binary.analysis.file.extention", ".dat");
38+
conf.setLong("binary.analysis.execution.timeoutMS", Long.MAX_VALUE);
39+
40+
conf.set("binary.analysis.program", "md5sum");
41+
conf.set("binary.analysis.program.args", "${file}");
42+
conf.set("binary.analysis.program.args.delim", ",");
43+
conf.set("binary.analysis.program.exit.codes", "0,1");
44+
45+
Job job = new Job(conf);
46+
job.setJobName(BinaryAnalysisJob.class.getName()+" inDir="+inDir+", outDir="+outDir);
47+
job.setJarByClass(getClass());
48+
49+
job.setMapperClass(BinaryAnalysisMapper.class);
50+
job.setNumReduceTasks(0);
51+
52+
job.setMapOutputKeyClass(Text.class);
53+
job.setMapOutputValueClass(BytesWritable.class);
54+
job.setOutputKeyClass(Text.class);
55+
job.setOutputValueClass(Text.class);
56+
57+
job.setInputFormatClass(SequenceFileInputFormat.class);
58+
SequenceFileInputFormat.setInputPaths(job, new Path(inDir));
59+
60+
job.setOutputFormatClass(SequenceFileOutputFormat.class);
61+
SequenceFileOutputFormat.setOutputPath(job, new Path(outDir));
62+
job.submit();
63+
64+
int retVal = job.waitForCompletion(true)?0:1;
65+
return retVal;
66+
}
67+
68+
public static void main(String[] args) throws Exception {
69+
ToolRunner.run(new BinaryAnalysisJob(), args);
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package io.covert.binary.analysis;
2+
3+
import io.covert.util.Utils;
4+
5+
import java.io.File;
6+
import java.io.FileOutputStream;
7+
import java.io.IOException;
8+
import java.util.Arrays;
9+
import java.util.Collection;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import java.util.Map.Entry;
13+
14+
import org.apache.hadoop.conf.Configuration;
15+
import org.apache.hadoop.io.BytesWritable;
16+
import org.apache.hadoop.io.Text;
17+
import org.apache.hadoop.mapred.JobConf;
18+
import org.apache.hadoop.mapreduce.Mapper;
19+
import org.apache.log4j.Logger;
20+
21+
public class BinaryAnalysisMapper<K, V> extends Mapper<Text, BytesWritable, K, V>
22+
{
23+
Logger LOG = Logger.getLogger(getClass());
24+
25+
OutputParser<K, V> parser = null;
26+
String program;
27+
String[] args;
28+
int[] exitCodes;
29+
Map<String, Object> substitutionMap = new HashMap<String, Object>();
30+
long uuid = 0;
31+
File workingDir;
32+
String fileExtention;
33+
long timeoutMS;
34+
35+
protected void setup(org.apache.hadoop.mapreduce.Mapper<Text,BytesWritable,K,V>.Context context) throws java.io.IOException ,InterruptedException
36+
{
37+
38+
Configuration conf = context.getConfiguration();
39+
try {
40+
parser = (OutputParser<K, V>) Class.forName(conf.get("binary.analysis.output.parser")).newInstance();
41+
} catch (Exception e) {
42+
throw new IOException("Could create parser", e);
43+
}
44+
45+
fileExtention = conf.get("binary.analysis.file.extention", ".dat");
46+
timeoutMS = conf.getLong("binary.analysis.execution.timeoutMS", Long.MAX_VALUE);
47+
program = conf.get("binary.analysis.program");
48+
args = conf.get("binary.analysis.program.args").split(conf.get("binary.analysis.program.args.delim", ","));
49+
String[] codes = conf.get("binary.analysis.program.exit.codes").split(",");
50+
exitCodes = new int[codes.length];
51+
for(int i = 0; i < codes.length; ++i)
52+
{
53+
exitCodes[i] = Integer.parseInt(codes[i]);
54+
}
55+
56+
JobConf job = new JobConf(conf);
57+
58+
// setup working dir
59+
workingDir = new File(new File(job.getJobLocalDir()).getParent(), context.getTaskAttemptID().toString());
60+
workingDir.mkdir();
61+
62+
LOG.info("Working dir: ");
63+
for(File f : workingDir.listFiles())
64+
{
65+
LOG.info(f);
66+
}
67+
68+
LOG.info("Working dir parent: ");
69+
for(File f : workingDir.getParentFile().listFiles())
70+
{
71+
LOG.info(f);
72+
}
73+
74+
LOG.info("job.getLocalDirs(): ");
75+
for(String f : job.getLocalDirs())
76+
{
77+
LOG.info(f);
78+
}
79+
80+
81+
// prepare binary for exec
82+
83+
}
84+
85+
protected void map(Text key, BytesWritable value, org.apache.hadoop.mapreduce.Mapper<Text,BytesWritable,K,V>.Context context) throws java.io.IOException ,InterruptedException
86+
{
87+
uuid++;
88+
89+
long fileCreationOverheadMS = System.currentTimeMillis();
90+
File binaryFile = new File(workingDir, uuid+fileExtention);
91+
FileOutputStream fileOut = new FileOutputStream(binaryFile);
92+
fileOut.write(value.getBytes());
93+
fileOut.close();
94+
fileCreationOverheadMS = System.currentTimeMillis() - fileCreationOverheadMS;
95+
context.getCounter("STATS", "fileCreationOverheadMS").increment(fileCreationOverheadMS);
96+
97+
substitutionMap.put("file", binaryFile.toString());
98+
99+
context.getCounter("STATS", "Process execution attempts").increment(1);
100+
LOG.info("Running: "+program+" args="+Arrays.toString(args)+", substitutionMap="+substitutionMap+", exitCodes="+Arrays.toString(exitCodes)+", workingDir="+workingDir);
101+
102+
long programExecutionTimeMS = System.currentTimeMillis();
103+
ExecutorThread exec = new ExecutorThread(program, args, substitutionMap, exitCodes, timeoutMS, workingDir);
104+
exec.start();
105+
while(exec.isAlive())
106+
{
107+
context.progress();
108+
Utils.sleep(100);
109+
}
110+
exec.join();
111+
programExecutionTimeMS = System.currentTimeMillis() - programExecutionTimeMS;
112+
context.getCounter("STATS", "programExecutionTimeMS").increment(programExecutionTimeMS);
113+
114+
LOG.info("Process completed, elapsed="+programExecutionTimeMS+"ms, ExitCode="+exec.getExitCode()+", processStarted="+exec.isProcessStarted()+", processDestroyed="+exec.isProcessDestroyed());
115+
if(exec.getExecuteException() != null)
116+
{
117+
LOG.error(exec.getExecuteException());
118+
}
119+
120+
if(exec.isProcessDestroyed())
121+
{
122+
context.getCounter("STATS", "Process destroyed").increment(1);
123+
}
124+
else
125+
{
126+
context.getCounter("STATS", "Process NOT destroyed").increment(1);
127+
}
128+
129+
context.getCounter("EXIT CODES", ""+exec.getExitCode()).increment(1);
130+
131+
if(exec.isProcessStarted())
132+
{
133+
context.getCounter("STATS", "Process started").increment(1);
134+
long parseOverheadMS = System.currentTimeMillis();
135+
// hand stdOut and stdErr to the parser
136+
parser.parse(key, value, exec.getStdOut(), exec.getStdErr());
137+
Collection<Entry<K, V>> values = parser.getResults();
138+
for(Entry<K, V> e : values)
139+
{
140+
context.write(e.getKey(), e.getValue());
141+
}
142+
parseOverheadMS = System.currentTimeMillis() - parseOverheadMS;
143+
context.getCounter("STATS", "parseOverheadMS").increment(parseOverheadMS);
144+
}
145+
else
146+
{
147+
context.getCounter("STATS", "Process NOT started").increment(1);
148+
}
149+
150+
long fileDeletionOverheadMS = System.currentTimeMillis();
151+
binaryFile.delete();
152+
fileDeletionOverheadMS = System.currentTimeMillis() - fileDeletionOverheadMS;
153+
context.getCounter("STATS", "fileDeletionOverheadMS").increment(fileDeletionOverheadMS);
154+
}
155+
156+
protected void cleanup(org.apache.hadoop.mapreduce.Mapper<Text,BytesWritable,K,V>.Context context) throws java.io.IOException ,InterruptedException
157+
{
158+
// cleanup working dir
159+
}
160+
}

0 commit comments

Comments
 (0)