정구리의 우주정복

인공지능을 활용한 악성파일 탐지 시스템 소스코드 (서정우 작성 부분) 본문

PYTHON/PROJECT

인공지능을 활용한 악성파일 탐지 시스템 소스코드 (서정우 작성 부분)

Jungry_ 2023. 1. 30. 13:15
반응형

 

 

File 분류 소스코드

import numpy
import pandas
import os
import shutil

file_root = 'C:\\Users\\user\\Desktop\\정우의 겅부\\졸작\\label.csv'
dir_path ='C:\\Users\\user\\Desktop\\정우의 겅부\\졸작\\test_folder'

csv_data = pd.read_csv(file_root,header=None)
file_list = os.listdir(dir_path)

csv_name_list = set(csv_data[0])
for i in file_list:
    now = i[:-4]
    if now in csv_name_list:
        num = int(csv_data[csv_data[0]==now][1].values) #i 에 이름값 num 에 label값
    if num == 1:
        move_src = 'C:\\Users\\user\\Desktop\\정우의 겅부\\졸작\malware\\'
        shutil.move(dir_path+'\\'+i,move_src+i)
    elif num == 0:
        move_src = 'C:\\Users\\user\\Desktop\\정우의 겅부\\졸작\\normal\\'
        shutil.move(dir_path+'\\'+i,move_src+i)
        
print('work it')

Pe 헤더 데이터 추출 소스

import csv
import os
import pefile
import math
import hashlib



IMAGE_DOS_HEADER = ['e_magic','e_cblp','e_cp','e_crlc','e_cparhdr','e_minalloc','e_maxalloc','e_ss','e_sp','e_csum',
                    'e_ip','e_cs','e_lfarlc','e_ovno','e_res','e_oemid','e_oeminfo','e_res2','e_lfanew']



FILE_HEADER= ["Machine","NumberOfSections","TimeDateStamp","PointerToSymbolTable",
              "NumberOfSymbols","SizeOfOptionalHeader","Characteristics"]

OPTIONAL_HEADER = ["Magic","MajorLinkerVersion","MinorLinkerVersion","SizeOfCode","SizeOfInitializedData",\
"SizeOfUninitializedData","AddressOfEntryPoint",\
"BaseOfCode","BaseOfData","ImageBase","SectionAlignment","FileAlignment",\
"MajorOperatingSystemVersion","MinorOperatingSystemVersion",\
"MajorImageVersion",\
"MinorImageVersion",\
"MajorSubsystemVersion",\
"MinorSubsystemVersion",\
"SizeOfImage",\
"SizeOfHeaders",\
"CheckSum",\
"Subsystem",\
"DllCharacteristics",\
"SizeOfStackReserve",\
"SizeOfStackCommit",\
"SizeOfHeapReserve",\
"SizeOfHeapCommit",\
"LoaderFlags",\
"NumberOfRvaAndSizes"]



def extract_image_dos_header(pe):
    IMAGE_DOS_HEADER_data = [0 for i in range(19)]
    try: #Dos_header 안의 내용 호출
        IMAGE_DOS_HEADER_data = [
            pe.DOS_HEADER.e_magic,
            pe.DOS_HEADER.e_cblp,
            pe.DOS_HEADER.e_cp, 
	    pe.DOS_HEADER.e_crlc,
            pe.DOS_HEADER.e_cparhdr,
	    pe.DOS_HEADER.e_minalloc,
            pe.DOS_HEADER.e_maxalloc,
	    pe.DOS_HEADER.e_ss,
            pe.DOS_HEADER.e_sp,
	    pe.DOS_HEADER.e_csum,
	    pe.DOS_HEADER.e_ip,
	    pe.DOS_HEADER.e_cs,
	    pe.DOS_HEADER.e_lfarlc,
	    pe.DOS_HEADER.e_ovno,
	    pe.DOS_HEADER.e_res,
	    pe.DOS_HEADER.e_oemid,
	    pe.DOS_HEADER.e_oeminfo,
	    pe.DOS_HEADER.e_res2,
            pe.DOS_HEADER.e_lfanew]
    except Exception , e:
        print e
    return IMAGE_DOS_HEADER_data


def extract_file_header(pe):
    FILE_HEADER_data = [0 for i in range(7)]
    try:
        FILE_HEADER_data=[
            pe.FILE_HEADER.Machine,
            pe.FILE_HEADER.TimeDateStamp,
            pe.FILE_HEADER.PointerToSymbolTable,
            pe.FILE_HEADER.NumberOfSymbols,
            pe.FILE_HEADER.SizeOfOptionalHeader,
            pe.FILE_HEADER.Characteristics]
    except Exception,e:
        print e
    return FILE_HEADER_data



def extract_optional_header(pe):
    OPTIONAL_HEADER_data = [0 for i in range(29)]

    try:
        OPTIONAL_HEADER_data = [pe.OPTIONAL_HEADER.Magic,
                pe.OPTIONAL_HEADER.MajorLinkerVersion,
                pe.OPTIONAL_HEADER.MinorLinkerVersion,
                pe.OPTIONAL_HEADER.SizeOfCode,
                pe.OPTIONAL_HEADER.SizeOfInitializedData,
                pe.OPTIONAL_HEADER.SizeOfUninitializedData,
                pe.OPTIONAL_HEADER.AddressOfEntryPoint,
                pe.OPTIONAL_HEADER.BaseOfCode,
                pe.OPTIONAL_HEADER.BaseOfData,
                pe.OPTIONAL_HEADER.ImageBase,
                pe.OPTIONAL_HEADER.SectionAlignment,
                pe.OPTIONAL_HEADER.FileAlignment,
                pe.OPTIONAL_HEADER.MajorOperatingSystemVersion,
                pe.OPTIONAL_HEADER.MinorOperatingSystemVersion,
                pe.OPTIONAL_HEADER.MajorImageVersion,
                pe.OPTIONAL_HEADER.MinorImageVersion,
                pe.OPTIONAL_HEADER.MajorSubsystemVersion,
                pe.OPTIONAL_HEADER.MinorSubsystemVersion,
                pe.OPTIONAL_HEADER.SizeOfImage,
                pe.OPTIONAL_HEADER.SizeOfHeaders,
                pe.OPTIONAL_HEADER.CheckSum,
                pe.OPTIONAL_HEADER.Subsystem,
                pe.OPTIONAL_HEADER.DllCharacteristics,
                pe.OPTIONAL_HEADER.SizeOfStackReserve,
                pe.OPTIONAL_HEADER.SizeOfStackCommit,
                pe.OPTIONAL_HEADER.SizeOfHeapReserve,
                pe.OPTIONAL_HEADER.SizeOfHeapCommit,
                pe.OPTIONAL_HEADER.LoaderFlags,
                pe.OPTIONAL_HEADER.NumberOfRvaAndSizes]
    except Exception,e:
        print 
    return OPTIONAL_HEADER_data



def extract_features(pe):
    IMAGE_DOS_HEADER_data = extract_image_dos_header(pe)
    FILE_HEADER_data = extract_file_header(pe)
    OPTIONAL_HEADER_data = extract_optional_header(pe)
    return IMAGE_DOS_HEADER_data + FILE_HEADER_data + OPTIONAL_HEADER_data





def main():
    source_path = r'C:\Users\833-24\Desktop\새 폴더\malware'
    output_file = r'C:\Users\833-24\Desktop\새 폴더\malware\result.csv'
    label = ['clean']
    f = open(output_file,'wt')
    writer = csv.writer(f)
    writer.writerow(IMAGE_DOS_HEADER+FILE_HEADER+OPTIONAL_HEADER+['label'])


    for subdir,dirs,files in os.walk(source_path):
        for file in files:
            input_file = source_path + '/' + file
            try:
                pe = pefile.PE(input_file)
            except Exception, e:
                print "Exception while loading file : " ,e
            else:
                try:
                    features = extract_features(pe)
                    writer.writerow(features+label)
                except Exception,e:
                    print "Exception while opening and write CSV file : " , e
    f.close()
    print 'It work !!'

main()

N-Gram  소스코드

import os
import pefile
import pydasm
import operator
import csv
import hashlib

from itertools import chain
from capstone import *
from capstone.x86 import *

class NGRAM_FEATURE:
    
    def __init__(self,output_file):
        self.output_file = output_file
        self.grams = dict()
        
    def get_asm(self,mode,file):
        asm = []
        pe = pefile.PE(file)
        bytes = []
        
        ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
        end = pe.OPTIONAL_HEADER.SizeOfCode
        
        for section in pe.sections:
            addr = section.VirtualAddress
            size = section.Misc_VirtualSize
            
            if ep > addr and ep < (addr+size):
                ep = addr
                end = size
        data = pe.get_memory_mapped_image()[ep:ep+end]
        offset = 0
        
        temp = data.encode('hex')
        temp = [temp[i:i+2] for i in range(0,len(temp),2)] #slice 2
        
        if mode:
            return temp
        
        md = Cs(CS_ARCH_X86,CS_MODE_32)
        md.detail = False
        
        for i in md.disasm(data,0x401000):
            asm.append(i.mnemonic)
        return asm
    
    def gen_list_n_gram(self,n,asm_code):
        for i in range(0,len(asm_code),n):
            yield asm_code[i:i+n]
            
    def n_grams(self,n,asm_code,check):
        if check == 1:
            grams = self.grams
        elif check == 0:
            grams = dict() #make new dict
            
        gen_list = self.gen_list_n_gram(n,asm_code)
        
        for i in gen_list:
            i = ' '.join(i)
            try:
                grams[i] += 1
            except:
                grams[i] = 1
                
        return grams
    
    def write_csv_header(self,csv_header):
        file_path = self.output_file
        HASH = ['filename','MD5']
        label = ['label']
        header = HASH+csv_header+label
        
        file = open(file_path,'wa')
        write = csv.writer(file,delimiter = ',')
        write.writerow(header)
        file.close()
    
    def count_n_gram(self,grams,header,label):
        grams_count = list() 
        
        for asm in header:
            try:
                grams_count.append(grams[asm])
            except:
                grams_count.append(0)
        grams_count.append(label)
        
        return grams_count
    
    def calc_file_hash(self,file_path):
        f = open(file_path,'rb')
        data = f.read()
        hash = hashlib.md5(data).hexdigest()
        return hash
    
    def write_csv_data(self,data):
        file_path = self.output_file
        file = open(file_path,'a')
        write = csv.writer(file,delimiter=',')
        write.writerow(data)
        file.close()

def main():
    num_feature = 100
    
    mal_path = '/home/jungry/Desktop/test_folder/'
    nor_path = '/home/jungry/Desktop/test_normal/'
    output_file = '/home/jungry/Desktop/ngram123.csv'
    
    print('Get N-gram from files')
    
    nf = NGRAM_FEATURE(output_file) #make class
    
    for file in os.listdir(mal_path):
        #print('FILE NAME = ',file)
        file = mal_path+file
        asm_code = nf.get_asm(0,file) #get asmcode
        grams = nf.n_grams(4,asm_code,1)
        
    for file in os.listdir(nor_path):
        #print('FILE NAME = ',file)
        file = nor_path+file
        asm_code = nf.get_asm(0,file) #get asmcode
        grams = nf.n_grams(4,asm_code,1)
    
    sort_grams = sorted(grams.items(),key=operator.itemgetter(1),reverse=True)
    feature = sort_grams[0:num_feature]
    csv_header = list(chain.from_iterable(zip(*feature)))[0:num_feature]
    nf.write_csv_header(csv_header)
    
    print('COMPLETE HEADER')
    
    #make csv_data
    for file in os.listdir(mal_path):
        print 'MAL FILE NAME = '+file
        file_path = mal_path+file
        asm_code = nf.get_asm(0,file_path)
        grams = nf.n_grams(4,asm_code,0)
        grams_count = nf.count_n_gram(grams,csv_header,1) #grams,header,label
        md5_hash = nf.calc_file_hash(file_path)
        
        #write data
        data = [file,md5_hash]
        data.extend(grams_count)
        nf.write_csv_data(data)
    print('***COMPLETE MALWARE DATA***')
        
    for file in os.listdir(nor_path):
        print 'NOR FILE NAME = '+file
        file_path = nor_path+file
        asm_code = nf.get_asm(0,file_path)
        grams = nf.n_grams(4,asm_code,0)
        grams_count = nf.count_n_gram(grams,csv_header,0)
        md5_hash = nf.calc_file_hash(file_path)
        
        data = [file,md5_hash]
        data.extend(grams_count)
        nf.write_csv_data(data)
    print('***COMPLETE NORMAL DATA***')

main()

TKinter 를 이용한 시각화 소스코드

#-*-coding:utf-8-*-

import os
import threading
import time
import subprocess
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

import tkinter.ttk as ttk
import tkinter.font as tkFont

from tkinter import filedialog
from tkinter import * 

global output
output = []

# ============================ Window1 =================================
"""
def delete_window():
    
    global delWin
    delWin = Tk()
    delWin.title("Finished Scan")
    delWin.geometry("200x120")
    delWin.resizable(0, 0)
    
    popUp_label = Label(delWin, text='Delete?')
    popUp_label.pack(pady=20)
    cancleBtn = Button(delWin, text='Cancle', command=delWin.destroy)
    cancleBtn.pack(side='bottom', pady=5)
    delBtn = Button(delWin, text='Delete', command=midDel)
    delBtn.pack(side='bottom')
    delWin.mainloop() 
"""
# ============================ Window2 =================================

def program_info():
    infoWindow = Tk()
    infoWindow.title("Program Information")
    infoWindow.geometry("640x480+100+100")
    infoWindow.resizable(0, 0)
    
    test_font = tkFont.Font(family="/usr/share/fonts/Wemakeprice/Wemakeprice-Regular", size=12)    

    # Project
    # Team name
    # About Project
    a = StringVar()
    project_name = StringVar()
    team_name = StringVar()
    #program_def = StringVar()
    #manual = StringVar()

    a.set("2021 정보보호학과 졸업작품")
    project_name.set("인공지능을 활용한 악성파일 탐지 시스템 ")
    team_name.set("팀명 : 네카라쿠배")
    #program_def.set("")
    program_def = '''윈도우의 PE 구조의 파일을 PE헤더, 4-gram(N-gram), 바이너리 이미지의
방법으로 특징을 추출하였고 분류 알고리즘을 이용하여 모델링을 진행,
가장 높은 정확도가 나오는 방법을 택해서 본 프로젝트를 진행하였다.
PE헤더와 check_packer 기능과 함께 추출한 특징을 RandomForest 알고리즘을
이용한 방법을 활용하여 Kicom 오픈소스 백신을 이용, GUI 프로그램을 만들어
프로젝트를 완료하였다.
'''
    #manual.set("")
    manual = '''1. 스캔하고자 하는 폴더를 선택합니다.
2. 선택된 경로를 확인하고 스캔 버튼을 눌러서 결과를 확인합니다.
3. 악성 확률을 확인하고 파일을 지울지 선택합니다. '''

    w1 = Label(infoWindow, textvariable=a).pack(side='top')
    w2 = Label(infoWindow, textvariable=project_name).pack(side='top')
    w3 = Label(infoWindow, textvariable=team_name).pack(side='top')
    w4 = Label(infoWindow, justify=LEFT, text=program_def, font=test_font ).pack(side='top')
    w5 = Label(infoWindow, justify=LEFT, text=manual, font=test_font ).pack(side='top')

    closeBtn = Button(infoWindow, text='Close', command=infoWindow.destroy)
    closeBtn.pack(side='bottom', pady=10)

    infoWindow.mainloop()

# ============================ Window3 =================================
def team_info():
    
    import webbrowser

    new = 1
    url = "http://www.nekabe.me"

    webbrowser.open(url, new=new)

# ============================ Function ================================
def delMal():
    comm = ['python', 'k2.py']
    comm.append('-l')
    comm.append(path)
    print comm
    p = subprocess.call(comm)
    '''
    while True:
        res = p.stdout.readline()
        if res == '' and p.poll() is not None: break
        if res:
            outputBox.insert('end', res.strip() + '\n')
            outputBox.see(END)
    '''
    popUp.destroy()

def midDel():
    t2 = threading.Thread(target=delMal)
    t2.start()

def run():
    comm = ['python', 'k2.py']
    comm.append('-f')
    comm.append(path)
    print comm
    p = subprocess.Popen(comm, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, shell = False)
    while True:
        res = p.stdout.readline()
        if res == '' and p.poll() is not None: break
        if res:
            outputBox.insert('end', res.strip() + '\n')
            outputBox.see(END)
    rc = p.poll()

    global popUp
    popUp = Tk()
    popUp.title('Result Scan')
    popUp.geometry("200x120+200+200")
    popUp.resizable(0,0)

    popUp_label = Label(popUp, text='Delete?')
    popUp_label.pack(pady=20)

    cancleBtn = Button(popUp, text='Cancle', command=popUp.destroy)
    cancleBtn.pack(side='bottom', pady=5)

    delBtn = Button(popUp, text='Delete', command=midDel)
    delBtn.pack(side='bottom')
    popUp.mainloop()
    
def midRun():
    t1 = threading.Thread(target=run)
    t1.start()

# ============================ Function ================================
def openDir():
    global path
    dir_path = filedialog.askdirectory(parent=root, initialdir="/home/stud/Deskop/", title="Select Dir")
    path = dir_path
    lblText.set('Selected\n' + path)

def force_quit():
    pass

# ============================== Main ==================================
root = Tk()
root.geometry('640x480+200+200')
root.resizable(0, 0)
root.title("네카라쿠배 v1.0")

# ============================= Frame1 =================================
top_frame = Frame(root, width=640, height=100)
top_frame.pack(side='top', fill='x')

title_font = tkFont.Font(size=16)

# Frame_left, Frame_right
frame_left  = Frame(top_frame, width=240, height=100)
frame_right = Frame(top_frame, width=400, height=100)
frame_left.pack(side='left', fill='x')
frame_right.pack(side='right', fill='x') 

# logo_img 
#jbu_logo = Image.open("./jbu.png")
image = PhotoImage(file='jbu.png')
logo_label = Label(frame_left, image=image)
logo_label.place(x=130, y=0)

# Title_label
title_text = StringVar()
#title_text.set("2021 정보보호학과")
#title_text.set("2021 정보보호학과 졸업작품")
#title_text.set("정보보호학과 네카라쿠배")
title_text.set("MalWare Scanner")

title_label = Label(frame_right, textvariable=title_text, font=("Verdana", 24,))
title_label.place(x=0, y=40)

# ============================= Frame1-2 =================================
top2_frame = Frame(root, width=600, height=40)
top2_frame.pack(side='top', fill='x', ipadx=35)

# result_label
result_text = StringVar()
#result_text.set("[  결 과 창  ]")
result_text.set("[ Result ]")

#result_label = Label(top2_frame, textvariable=result_text, font=('Times', 8))
result_label = Label(top2_frame, textvariable=result_text)
result_label.pack(side='left', padx=35)

test_label = Label(top2_frame, text='')
test_label.pack(side='right', padx=15)

# team_info_button
#tInfoBtn = Button(top2_frame, width=14, text="만든 사람들", font=("Times", 8), command=team_info)
tInfoBtn = Button(top2_frame, width=12, text="CREDIT", command=team_info)
tInfoBtn.pack(side='right', padx=5, pady=2)

# program_info_button
#pInfoBtn = Button(top2_frame, width=14, text="프로그램 정보", font=("Times", 8), command=None)
pInfoBtn = Button(top2_frame, width=12, text="PROGRAM INFO", command=program_info)
pInfoBtn.pack(side='right')

# ============================== Frame2 ================================
middle_frame = Frame(root, width=640, height=280)
middle_frame.pack(side='top')

# Result
outputBox = Text(middle_frame)
outputBox.pack(expand=True)

# ============================== Frame3 ================================
bottom_frame = Frame(root, width=640, height=80)
bottom_frame.pack(side='bottom', fill='x')

lblText = StringVar()
lblText.set("Not Search <Dir>")
#label font

# Path Label
path_lbl = Label(bottom_frame, textvariable = lblText) #font=font
path_lbl.pack(pady=5)

# Quit Button -> 3
quitBtn = Button(bottom_frame, width=12, text="Quit", command=root.destroy)
#quitBtn.pack(side="right", padx=5, pady=10)
quitBtn.pack(side="right", padx=5, pady=7)

# Scan Button -> 2
scanBtn = Button(bottom_frame, width=12, text="Scan", command=midRun)
scanBtn.pack(side="right", padx=5)

# Select Folder Button -> 1
openBtn = Button(bottom_frame, width=12, text="Select Folder", command=openDir)
openBtn.pack(side="right", padx=5)

copyright = StringVar()
#copyright.set("Copyright 2021 네카라쿠배")
copyright.set("Copyright 2021 Nekalikube")
copyrightLabel = Label(bottom_frame, textvariable=copyright).pack(side='left', padx=5)
# ============================== END CODE ==============================
root.mainloop()
반응형
Comments