mirror of https://github.com/google/pebble
				
				
				
			
		
			
				
	
	
		
			156 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			156 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
| # Run a fuzz test to verify robustness against corrupted/malicious data.
 | |
| 
 | |
| import sys
 | |
| import time
 | |
| import zipfile
 | |
| import random
 | |
| import subprocess
 | |
| 
 | |
| Import("env", "malloc_env")
 | |
| 
 | |
| def set_pkgname(src, dst, pkgname):
 | |
|     data = open(str(src)).read()
 | |
|     placeholder = '// package name placeholder'
 | |
|     assert placeholder in data
 | |
|     data = data.replace(placeholder, 'package %s;' % pkgname)
 | |
|     open(str(dst), 'w').write(data)
 | |
| 
 | |
| # We want both pointer and static versions of the AllTypes message
 | |
| # Prefix them with package name.
 | |
| env.Command("alltypes_static.proto", "#alltypes/alltypes.proto",
 | |
|             lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_static'))
 | |
| env.Command("alltypes_pointer.proto", "#alltypes/alltypes.proto",
 | |
|             lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_pointer'))
 | |
| 
 | |
| env.NanopbProto(["alltypes_pointer", "alltypes_pointer.options"])
 | |
| env.NanopbProto(["alltypes_static", "alltypes_static.options"])
 | |
| 
 | |
| # Do the same for proto3 versions
 | |
| env.Command("alltypes_proto3_static.proto", "#alltypes_proto3/alltypes.proto",
 | |
|             lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_static'))
 | |
| env.Command("alltypes_proto3_pointer.proto", "#alltypes_proto3/alltypes.proto",
 | |
|             lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_pointer'))
 | |
| 
 | |
| env.NanopbProto(["alltypes_proto3_pointer", "alltypes_proto3_pointer.options"])
 | |
| env.NanopbProto(["alltypes_proto3_static", "alltypes_proto3_static.options"])
 | |
| 
 | |
| # And also a callback version
 | |
| env.Command("alltypes_callback.proto", "#alltypes/alltypes.proto",
 | |
|             lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_callback'))
 | |
| env.NanopbProto(["alltypes_callback", "alltypes_callback.options"])
 | |
| 
 | |
| common_objs = [env.Object("random_data.c"),
 | |
|                env.Object("validation.c"),
 | |
|                env.Object("flakystream.c"),
 | |
|                env.Object("alltypes_pointer.pb.c"),
 | |
|                env.Object("alltypes_static.pb.c"),
 | |
|                env.Object("alltypes_callback.pb.c"),
 | |
|                env.Object("alltypes_proto3_pointer.pb.c"),
 | |
|                env.Object("alltypes_proto3_static.pb.c"),
 | |
|                 "$COMMON/malloc_wrappers.o"]
 | |
| objs_malloc = ["$COMMON/pb_encode_with_malloc.o",
 | |
|                "$COMMON/pb_decode_with_malloc.o",
 | |
|                "$COMMON/pb_common_with_malloc.o"] + common_objs
 | |
| objs_static = ["$COMMON/pb_encode.o",
 | |
|                "$COMMON/pb_decode.o",
 | |
|                "$COMMON/pb_common.o"] + common_objs
 | |
| 
 | |
| fuzz = malloc_env.Program(["fuzztest.c"] + objs_malloc)
 | |
| 
 | |
| # Run the stand-alone fuzz tester
 | |
| seed = int(time.time())
 | |
| if env.get('EMBEDDED'):
 | |
|     iterations = 100
 | |
| else:
 | |
|     iterations = 1000
 | |
| env.RunTest(fuzz, ARGS = [str(seed), str(iterations)])
 | |
| 
 | |
| generate_message = malloc_env.Program(["generate_message.c"] + objs_static)
 | |
| 
 | |
| # Test the message generator
 | |
| env.RunTest(generate_message, ARGS = [str(seed)])
 | |
| env.RunTest("generate_message.output.fuzzed", [fuzz, "generate_message.output"])
 | |
| 
 | |
| # Run against the latest corpus from ossfuzz
 | |
| # This allows quick testing against regressions and also lets us more
 | |
| # completely test slow embedded targets. To reduce runtime, only a subset
 | |
| # of the corpus is fuzzed each time.
 | |
| def run_against_corpus(target, source, env):
 | |
|     corpus = zipfile.ZipFile(str(source[1]), 'r')
 | |
|     count = 0
 | |
|     args = [str(source[0])]
 | |
| 
 | |
|     if "TEST_RUNNER" in env:
 | |
|         args = [env["TEST_RUNNER"]] + args
 | |
| 
 | |
|     if "FUZZTEST_CORPUS_SAMPLESIZE" in env:
 | |
|         samplesize = int(env["FUZZTEST_CORPUS_SAMPLESIZE"])
 | |
|     elif env.get('EMBEDDED'):
 | |
|         samplesize = 100
 | |
|     else:
 | |
|         samplesize = 4096
 | |
| 
 | |
|     files = [n for n in corpus.namelist() if not n.endswith('/')]
 | |
|     files = random.sample(files, min(samplesize, len(files)))
 | |
|     for filename in files:
 | |
|         sys.stdout.write("Fuzzing: %5d/%5d: %-40.40s\r" % (count, len(files), filename))
 | |
|         sys.stdout.flush()
 | |
| 
 | |
|         count += 1
 | |
| 
 | |
|         maxsize = env.get('CPPDEFINES', {}).get('FUZZTEST_BUFSIZE', 256*1024)
 | |
|         data_in = corpus.read(filename)[:maxsize]
 | |
| 
 | |
|         try:
 | |
|             process = subprocess.Popen(args, stdin=subprocess.PIPE,
 | |
|                                        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|             stdout, stderr = process.communicate(input = data_in)
 | |
|             result = process.wait()
 | |
|         except OSError as e:
 | |
|             if e.errno == 22:
 | |
|                 print("Warning: OSError 22 when running with input " + filename)
 | |
|                 result = process.wait()
 | |
|             else:
 | |
|                 raise
 | |
| 
 | |
|         if result != 0:
 | |
|             stdout += stderr
 | |
|             print(stdout)
 | |
|             print('\033[31m[FAIL]\033[0m   Program ' + str(args) + ' returned ' + str(result) + ' with input ' + filename + ' from ' + str(source[1]))
 | |
|             return result
 | |
| 
 | |
|     open(str(target[0]), 'w').write(str(count))
 | |
|     print('\033[32m[ OK ]\033[0m   Ran ' + str(args) + " against " + str(source[1]) + " (" + str(count) + " entries)")
 | |
| 
 | |
| env.Command("corpus.zip.fuzzed", [fuzz, "corpus.zip"], run_against_corpus)
 | |
| env.Command("regressions.zip.fuzzed", [fuzz, "regressions.zip"], run_against_corpus)
 | |
| 
 | |
| # Build separate fuzzers for each test case.
 | |
| # Having them separate speeds up control flow based fuzzer engines.
 | |
| # These are mainly used by oss-fuzz project.
 | |
| env_proto2_static = env.Clone()
 | |
| env_proto2_static.Append(CPPDEFINES = {'FUZZTEST_PROTO2_STATIC': '1'})
 | |
| env_proto2_static.Program("fuzztest_proto2_static",
 | |
|     [env_proto2_static.Object("fuzztest_proto2_static.o", "fuzztest.c")] + objs_static)
 | |
| 
 | |
| env_proto2_pointer = malloc_env.Clone()
 | |
| env_proto2_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO2_POINTER': '1'})
 | |
| env_proto2_pointer.Program("fuzztest_proto2_pointer",
 | |
|     [env_proto2_pointer.Object("fuzztest_proto2_pointer.o", "fuzztest.c")] + objs_malloc)
 | |
| 
 | |
| env_proto3_static = env.Clone()
 | |
| env_proto3_static.Append(CPPDEFINES = {'FUZZTEST_PROTO3_STATIC': '1'})
 | |
| env_proto3_static.Program("fuzztest_proto3_static",
 | |
|     [env_proto3_static.Object("fuzztest_proto3_static.o", "fuzztest.c")] + objs_static)
 | |
| 
 | |
| env_proto3_pointer = malloc_env.Clone()
 | |
| env_proto3_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO3_POINTER': '1'})
 | |
| env_proto3_pointer.Program("fuzztest_proto3_pointer",
 | |
|     [env_proto3_pointer.Object("fuzztest_proto3_pointer.o", "fuzztest.c")] + objs_malloc)
 | |
| 
 | |
| env_io_errors = malloc_env.Clone()
 | |
| env_io_errors.Append(CPPDEFINES = {'FUZZTEST_IO_ERRORS': '1'})
 | |
| env_io_errors.Program("fuzztest_io_errors",
 | |
|     [env_io_errors.Object("fuzztest_io_errors.o", "fuzztest.c")] + objs_malloc)
 | |
| 
 |