How to build your own PoC framework - part 2

Author: w7ay@Knownsec 404 Team

Chinese version: https://paper.seebug.org/913/

Related reading: How to build your own PoC framework - the use of Pocsuite3

In this article, I want to use Poscuite to implement my own PoC framework.

First of all, let's take a nice name, such as Captain America, Thor, Ant-Man or Thanos. If you like playing Dota, you can also call it Mountain King, BladeMaster, Priess Of the moon.

I call it AirPoc because I'm lazy.

The next step is to design its function. We can assume that there exists a "rabbit security alliance". We plan to develop a blockchain-based PoC verification framework named as AirPoc, which is only responsible for the websites within "rabbit security alliance".

When one AirPoc node checks out the vulnerability point, the URL and the PoC is shared via block, and then verfied by other random nodes. If the verification succeed, the "air currency" can be obtained, and the detected website owner needs to pay "air currency" as compensation.

First just imagine what kind of PoC framework we need:

  1. Simple and cross-platform
  2. Can make more people contribute
  3. Easy to build into other software
  4. High verfication speed and accuracy
  5. I don't know what to include in the framework. But the first thing is to build a runable one!

Of course, I didn't add the blockchain, it's just a joke.

Detail

We use python(3.7 the last version) as the programming language and decide not to use third-party dependencies.

It is cool to make contribution to some open source projects like sqlmap, Metasploit and Routersploit. So I decide to publish the AirPoc's PoC on Github and then we can call the PoC via online API. That's cool!

It is also easier to integrate into other softwares. If it uses Python, you can import AirPoC as a package. For other software, AirPoc designs the RPC interface for them to call. Besides, if you don't have a Python environment, you can also usepyinstaller to package it. Don't rely on other libraries is our design principle, which will help to avoid many other problems.

To achieve high verification speed and high verification accuracy, we need to build a concurrency model with multi-thread and coroutine, which I will describe later.

AirPoc's framework

Before we complete the plan, we also need to design code structure. As a perfectionist programmer, the good code structure is the first step when constructing a big project. We create the directory as following. , env is the directory for managing packages in a virtual environment. libis the directory for storing related core files. pocs is the directory for storing poc files, and a file main .py is used as the initial entry.

As building depends on its foundations, the basic framework is the base for our future work. We don't need to write the specific functions, but should understand "foundation" function. In the main.py file, an initial framework is completed.

import os

import time

def banner():

msg = '''

___ _ _____ _____ _____ _____

/ | | | | _ \ | _ \ / _ \ / ___|

/ /| | | | | |_| | | |_| | | | | | | |

/ / | | | | | _ / | ___/ | | | | | |

/ / | | | | | | \ \ | | | |_| | | |___

/_/ |_| |_| |_| \_\ |_| \_____/ \_____| {}

'''.format(version)

print(msg)

def init(config: dict):

print("[*] target:{}".format(config["url"]))

def end():

print("[*] shutting down at {0}".format(time.strftime("%X")))

def start():

pass

def main():

banner()

config = {

"url": "https://www.seebug.org/"

}

init(config)

start()

end()

if __name__ == '__main__':

version = "v0.00000001"

main()

image-20190429110505553

However, as you can see, the version number is as low as how much bitcoin in my wallet, we need to add some material.

Singleton

In the initialization stage of software engineering, we need to store a lot of environment-related information. For example, what is the current execution path, where is the PoC directory, where is the directory the output file located and so on.

All those information has the same attribute. Those information should be loaded once and then can be used directly without change. In the software engineering, the "singleton" pattern, one pattern in software engineering, fits that need very well.

Fortunately, the python module is a natural singleton, because when the code is executed for the first time, it will generate the.pyc file for the future use. Then the import of that module will use the previous file without extra execution. Therefore, we only need to define related functions and data in a module to get a singleton object.

We create a new data.py in the lib directory to store this information. Also put the version information here.

import os

PATHS_ROOT = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")

PATHS_POCS = os.path.join(PATHS_ROOT, "pocs")

PATHS_OUTPUT = os.path.join(PATHS_ROOT, "output")

VERSION = "v0.0000001"

We use the PEP8 standard with uppercase and underscores to represent these constants. To illustrate the difference from the previous one, we symbolically reduce version number by a 0 to express our "bitcoin" and increased by 10 times.

Dynamic module

After solving our delete related environmental problems, let's take a look at how to dynamically load modules. As what we discuss before. we expect PoC to be able to load from local or remote sites such as GitHub.

There are two cases here. If you load the module through the file path, you can directly use __import__ () to load. If you want to load remotely, it may be more complicated. According to python related documents, we have to implement the "Finder" and "Loader". https://docs.python.org/en-us/3/reference/import.html.

Of course, you can also use the local storage mode after saving from remote websites. But we can use the loader code the PocSuite provided already.

Create a new lib/loader.py file

import hashlib

import importlib

from importlib.abc import Loader

def get_md5(value):

if isinstance(value, str):

value = value.encode(encoding='UTF-8')

return hashlib.md5(value).hexdigest()

def load_string_to_module(code_string, fullname=None):

try:

module_name = 'pocs_{0}'.format(get_md5(code_string)) if fullname is None else fullname

file_path = 'airpoc://{0}'.format(module_name)

poc_loader = PocLoader(module_name, file_path)

poc_loader.set_data(code_string)

spec = importlib.util.spec_from_file_location(module_name, file_path, loader=poc_loader)

mod = importlib.util.module_from_spec(spec)

spec.loader.exec_module(mod)

return mod

except ImportError:

error_msg = "load module '{0}' failed!".format(fullname)

print(error_msg)

raise

class PocLoader(Loader):

def __init__(self, fullname, path):

self.fullname = fullname

self.path = path

self.data = None

def set_data(self, data):

self.data = data

def get_filename(self, fullname):

return self.path

def get_data(self, filename):

if filename.startswith('airpoc://') and self.data:

data = self.data

else:

with open(filename, encoding='utf-8') as f:

data = f.read()

return data

def exec_module(self, module):

filename = self.get_filename(self.fullname)

poc_code = self.get_data(filename)

obj = compile(poc_code, filename, 'exec', dont_inherit=True, optimize=-1)

exec(obj, module.__dict__)

The detail of the implement is not what we should care about. We only need to know that we can use load_string_to_module to load modules from the source code. If you are interested in how that works, you can refer to the official python documentation mentioned above.

PoC's code format development

After loading the module, we can code for the operation. We need to set some uniform convention for the PoC, thus the program can call them better.

The complexity of the PoC depends on the design structure we need, it can be completed and detailed or as simple as possible.As mentioned earlier, in order to protect the security of the "security alliance", we need PoC to be simpler and faster to write.

At the same time, we also need to consider the case in which the PoC needs to handle multiple parameters. My rules are defined as follows:

def verify(arg, **kwargs):

result = {}

if requests.get(arg).status_code == 200:

result = {

"name":"vulnerability name",

"url":arg

}

return result

Define a verify function in the PoC file for validation, arg as a normal parameter, and receive from kwargs when more parameters need to be passed. After the PoC verification succeed, we return a dictionary, otherwise return False or None. The content of the dictionary can be designed by the PoC writer with the author maximum flexibility.

Attention please! The quality of a PoC relies on the maintenance of the author.

V0.01

The ultimate goal we have to achieve is to set the target, and then the program automatically loads the specified one or more PoCs or all PoCs, and detects the targets one by one. The rest of our work is integrating these functions together. We have already implemented the basic framework of AirPoc, and now we only need to implement the functions on top of that.For the convenience of testing, we first create two simple PoCs in the pocs directory according to the previously defined rules.

image-20190429164121695

image-20190429164143909

Now,main.py code is

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

# @Time : 2019/4/25 3:13 PM

# @Author : w7ay

# @File : main.py

import os

import time

from lib.data import VERSION, PATHS_POCS, POCS

from lib.loader import load_string_to_module

def banner():

msg = '''

___ _ _____ _____ _____ _____

/ | | | | _ \ | _ \ / _ \ / ___|

/ /| | | | | |_| | | |_| | | | | | | |

/ / | | | | | _ / | ___/ | | | | | |

/ / | | | | | | \ \ | | | |_| | | |___

/_/ |_| |_| |_| \_\ |_| \_____/ \_____| {}

'''.format(VERSION)

print(msg)

def init(config: dict):

print("[*] target:{}".format(config["url"]))

# Load the poc, first traverse the path

_pocs = []

for root, dirs, files in os.walk(PATHS_POCS):

files = filter(lambda x: not x.startswith("__") and x.endswith(".py") and x not in config.get("poc", []),

files) # Filter out the __init__.py file and specify the poc file

_pocs.extend(map(lambda x: os.path.join(root, x), files))

# Load PoC according to the path

for poc in _pocs:

with open(poc, 'r') as f:

model = load_string_to_module(f.read())

POCS.append(model)

def end():

print("[*] shutting down at {0}".format(time.strftime("%X")))

def start(config: dict):

url_list = config.get("url", [])

# The loop url_list and pocs are executed one by one.

for i in url_list:

for poc in POCS:

try:

ret = poc.verify(i)

except Exception as e:

ret = None

print(e)

if ret:

print(ret)

def main():

banner()

config = {

"url": ["https://www.seebug.org/", "https://paper.seebug.org/"],

"poc": []

}

init(config)

start(config)

end()

if __name__ == '__main__':

main()

Our version has also come to 0.01, it is already a "mature" framework which can run PoC on its own.image-20190429172118745

Multi-threaded

In order to make our framework run faster, we use multithreading to handle each PoC, because most of the tasks we deal with are I/O intensive tasks, so we don't have to worry about whether Python is a pseudo-thread.

The simplest of the multithreaded models is the producer/consumer model, which starts multiple threads to consume a queue together. Create a new lib/threads.py

import threading

import time

def exception_handled_function(thread_function, args=()):

try:

thread_function(*args)

except KeyboardInterrupt:

raise

except Exception as ex:

print("thread {0}: {1}".format(threading.currentThread().getName(), str(ex)))

def run_threads(num_threads, thread_function, args: tuple = ()):

threads = []

# Start multiple threads

for num_threads in range(num_threads):

thread = threading.Thread(target=exception_handled_function, name=str(num_threads),

args=(thread_function, args))

thread.setDaemon(True)

try:

thread.start()

except Exception as ex:

err_msg = "error occurred while starting new thread ('{0}')".format(str(ex))

print(err_msg)

break

threads.append(thread)

# Waiting for all threads to finish

alive = True

while alive:

alive = False

for thread in threads:

if thread.isAlive():

alive = True

time.sleep(0.1)

It's worth noting that we didn't use the join() recommended in the Python thread to block the thread, because with join(), python will not be able to respond to user-entered messages, causing Ctrl+C to exit. There is no response, so the thread is blocked in a while loop.

Then the main program is transformed into a multi-threaded mode, and the "consumer" in the original start() is extracted and used as a function alone, and the data can be received by the queue. As follows:

def worker():

if not WORKER.empty():

arg, poc = WORKER.get()

try:

ret = poc.verify(arg)

except Exception as e:

ret = None

print(e)

if ret:

print(ret)

def start(config: dict):

url_list = config.get("url", [])

# producer

for arg in url_list:

for poc in POCS:

WORKER.put((arg, poc))

# comsumer

run_threads(10, worker)

In addition, the number of threads is configurable and we change it to read from the configuration.

run_threads(config.get("thread_num", 10), worker)

Run it again and you will find it much faster than before!

image-20190430102952036

Network request

This is the last part of our entire framework, how to unify network requests. Sometimes we need to make the settings of the proxy, UA header, etc. in the network request sent by our PoC framework, which requires our framework to handle it uniformly. Before we can achieve our goal, we also need to make an agreement in the framework, agree that our network requests need to use requests to send packets. At the beginning we said that we will try not to use third-party modules, but the requests module is so good that we exclude it...

The dynamic mechanism of the Python language, we can easily hook it before using a function, and redirect its original method to our custom method, which is a prerequisite for us to unify the network request.

def hello(arg):

return "hello " + arg

def hook(arg):

arg = arg.upper()

return "hello " + arg

hello = hook

print(hello("aa"))

image-20190430112557108

By hooking a function to achieve our own purpose.

Tools like sqlmap are based on Python's built-in urllib module, but a lot of code is being processed in network requests, and even to handle the chunked issue, hooks rewrite the lower-level httplib library.

In order to uniformly schedule network requests, pocsuite hooks the related methods of the requests module. We can refer to the code in detail.

The pocsuite3/lib/request/patch/__init__.py code clearly illustrates the hook function.

from .remove_ssl_verify import remove_ssl_verify

from .remove_warnings import disable_warnings

from .hook_request import patch_session

from .add_httpraw import patch_addraw

from .hook_request_redirect import patch_redirect

def patch_all():

disable_warnings()

remove_ssl_verify()

patch_session()

patch_addraw()

patch_redirect()

If you look at the source of the requests, you will know that the focus is on how it hooks the seesion function.

pocsuite3/lib/request/patch/hook_request.py

from pocsuite3.lib.core.data import conf

from requests.models import Request

from requests.sessions import Session

from requests.sessions import merge_setting, merge_cookies

from requests.cookies import RequestsCookieJar

from requests.utils import get_encodings_from_content

def session_request(self, method, url,

params=None, data=None, headers=None, cookies=None, files=None, auth=None,

timeout=conf.timeout if 'timeout' in conf else None,

allow_redirects=True, proxies=None, hooks=None, stream=None, verify=False, cert=None, json=None):

# Create the Request

merged_cookies = merge_cookies(merge_cookies(RequestsCookieJar(), self.cookies),

cookies or (conf.cookie if 'cookie' in conf else None))

req = Request(

method=method.upper(),

url=url,

headers=merge_setting(headers, conf.http_headers if 'http_headers' in conf else {}),

files=files,

data=data or {},

json=json,

params=params or {},

auth=auth,

cookies=merged_cookies,

hooks=hooks,

)

prep = self.prepare_request(req)

proxies = proxies or (conf.proxies if 'proxies' in conf else {})

settings = self.merge_environment_settings(

prep.url, proxies, stream, verify, cert

)

# Send the request.

send_kwargs = {

'timeout': timeout,

'allow_redirects': allow_redirects,

}

send_kwargs.update(settings)

resp = self.send(prep, **send_kwargs)

if resp.encoding == 'ISO-8859-1':

encodings = get_encodings_from_content(resp.text)

if encodings:

encoding = encodings[0]

else:

encoding = resp.apparent_encoding

resp.encoding = encoding

return resp

def patch_session():

Session.request = session_request

It overrides the method of the session_request function, which allows you to customize the information such as our custom headers. The above code may require you to read the requests to understand him, but it doesn't matter, we can use it directly in the spirit of takership.

In order to achieve this and better optimize the framework, we also need to make some minor adjustments.

Create a new lib/requests.py

from lib.data import CONF

from requests.models import Request

from requests.sessions import Session

from requests.sessions import merge_setting, merge_cookies

from requests.cookies import RequestsCookieJar

from requests.utils import get_encodings_from_content

def session_request(self, method, url,

params=None, data=None, headers=None, cookies=None, files=None, auth=None,

timeout=None,

allow_redirects=True, proxies=None, hooks=None, stream=None, verify=False, cert=None, json=None):

# Create the Request.

conf = CONF.get("requests", {})

if timeout is None and "timeout" in conf:

timeout = conf["timeout"]

merged_cookies = merge_cookies(merge_cookies(RequestsCookieJar(), self.cookies),

cookies or (conf.cookie if 'cookie' in conf else None))

req = Request(

method=method.upper(),

url=url,

headers=merge_setting(headers, conf["headers"] if 'headers' in conf else {}),

files=files,

data=data or {},

json=json,

params=params or {},

auth=auth,

cookies=merged_cookies,

hooks=hooks,

)

prep = self.prepare_request(req)

proxies = proxies or (conf["proxies"] if 'proxies' in conf else {})

settings = self.merge_environment_settings(

prep.url, proxies, stream, verify, cert

)

# Send the request.

send_kwargs = {

'timeout': timeout,

'allow_redirects': allow_redirects,

}

send_kwargs.update(settings)

resp = self.send(prep, **send_kwargs)

if resp.encoding == 'ISO-8859-1':

encodings = get_encodings_from_content(resp.text)

if encodings:

encoding = encodings[0]

else:

encoding = resp.apparent_encoding

resp.encoding = encoding

return resp

def patch_session():

Session.request = session_request

Also reserve the request interface in config

image-20190430115617047

And execute our hooks when init.

image-20190430115652364

Let's write a new PoC and use this site to test the final effect http://www.httpbin.org/get

pocs/poc.py

import requests

def verify(arg, **kwargs):

r = requests.get(arg)

if r.status_code == 200:

return {"url": arg, "text": r.text}

image-20190430130417913

The effect is very good, but if you add https website, there is a warning message.

image-20190430130730241

Also refer to the Pocsuite method to disable the warning information.

from urllib3 import disable_warnings

disable_warnings()

Finally, change the version number to 0.1, and the framework part of AirPoc is generally completed.

At last

Many of AirPoc's structural ideas are come from Pocsuite. If you read Pocsuite directly, you may get a lot of things. At present, the AirPoc v0.1 infrastructure has been almost completed, and one or more PoCs can be loaded locally for batch testing. Later, we are trying to play some more fun,eg: how to verify the situation without echo, how to generate shellcode, and how to operate the shell back, so stay tuned for the next section~zzzzz.

Download AirPoc: https://images.seebug.org/archive/airpoc.zip

About Knownsec & 404 Team

Beijing Knownsec Information Technology Co., Ltd. was established by a group of high-profile international security experts. It has over a hundred frontier security talents nationwide as the core security research team to provide long-term internationally advanced network security solutions for the government and enterprises.

Knownsec's specialties include network attack and defense integrated technologies and product R&D under new situations. It provides visualization solutions that meet the world-class security technology standards and enhances the security monitoring, alarm and defense abilities of customer networks with its industry-leading capabilities in cloud computing and big data processing. The company's technical strength is spanly recognized by the State Ministry of Public Security, the Central Government Procurement Center, the Ministry of Industry and Information Technology (MIIT), China National Vulnerability Database of Information Security (CNNVD), the Central Bank, the Hong Kong Jockey Club, Microsoft, Zhejiang Satellite TV and other well-known clients.

404 Team, the core security team of Knownsec, is dedicated to the research of security vulnerability and offensive and defensive technology in the fields of Web, IoT, industrial control, blockchain, etc. 404 team has submitted vulnerability research to many well-known vendors such as Microsoft, Apple, Adobe, Tencent, Alibaba, Baidu, etc. And has received a high reputation in the industry.

The most well-known sharing of Knownsec 404 Team includes: KCon Hacking Conference, Seebug Vulnerability Database and ZoomEye Cyberspace Search Engine.

以上是 How to build your own PoC framework - part 2 的全部内容, 来源链接: utcz.com/p/199322.html

回到顶部