정구리의 우주정복
인공지능을 활용한 악성파일 탐지 시스템 소스코드 (서정우 작성 부분) 본문
반응형
File 분류 소스코드
import numpy
import pandas
import os
import shutil
file_root = 'C:\\Users\\user\\Desktop\\정우의 겅부\\졸작\\label.csv'
dir_path ='C:\\Users\\user\\Desktop\\정우의 겅부\\졸작\\test_folder'
csv_data = pd.read_csv(file_root,header=None)
file_list = os.listdir(dir_path)
csv_name_list = set(csv_data[0])
for i in file_list:
now = i[:-4]
if now in csv_name_list:
num = int(csv_data[csv_data[0]==now][1].values) #i 에 이름값 num 에 label값
if num == 1:
move_src = 'C:\\Users\\user\\Desktop\\정우의 겅부\\졸작\malware\\'
shutil.move(dir_path+'\\'+i,move_src+i)
elif num == 0:
move_src = 'C:\\Users\\user\\Desktop\\정우의 겅부\\졸작\\normal\\'
shutil.move(dir_path+'\\'+i,move_src+i)
print('work it')
Pe 헤더 데이터 추출 소스
import csv
import os
import pefile
import math
import hashlib
IMAGE_DOS_HEADER = ['e_magic','e_cblp','e_cp','e_crlc','e_cparhdr','e_minalloc','e_maxalloc','e_ss','e_sp','e_csum',
'e_ip','e_cs','e_lfarlc','e_ovno','e_res','e_oemid','e_oeminfo','e_res2','e_lfanew']
FILE_HEADER= ["Machine","NumberOfSections","TimeDateStamp","PointerToSymbolTable",
"NumberOfSymbols","SizeOfOptionalHeader","Characteristics"]
OPTIONAL_HEADER = ["Magic","MajorLinkerVersion","MinorLinkerVersion","SizeOfCode","SizeOfInitializedData",\
"SizeOfUninitializedData","AddressOfEntryPoint",\
"BaseOfCode","BaseOfData","ImageBase","SectionAlignment","FileAlignment",\
"MajorOperatingSystemVersion","MinorOperatingSystemVersion",\
"MajorImageVersion",\
"MinorImageVersion",\
"MajorSubsystemVersion",\
"MinorSubsystemVersion",\
"SizeOfImage",\
"SizeOfHeaders",\
"CheckSum",\
"Subsystem",\
"DllCharacteristics",\
"SizeOfStackReserve",\
"SizeOfStackCommit",\
"SizeOfHeapReserve",\
"SizeOfHeapCommit",\
"LoaderFlags",\
"NumberOfRvaAndSizes"]
def extract_image_dos_header(pe):
IMAGE_DOS_HEADER_data = [0 for i in range(19)]
try: #Dos_header 안의 내용 호출
IMAGE_DOS_HEADER_data = [
pe.DOS_HEADER.e_magic,
pe.DOS_HEADER.e_cblp,
pe.DOS_HEADER.e_cp,
pe.DOS_HEADER.e_crlc,
pe.DOS_HEADER.e_cparhdr,
pe.DOS_HEADER.e_minalloc,
pe.DOS_HEADER.e_maxalloc,
pe.DOS_HEADER.e_ss,
pe.DOS_HEADER.e_sp,
pe.DOS_HEADER.e_csum,
pe.DOS_HEADER.e_ip,
pe.DOS_HEADER.e_cs,
pe.DOS_HEADER.e_lfarlc,
pe.DOS_HEADER.e_ovno,
pe.DOS_HEADER.e_res,
pe.DOS_HEADER.e_oemid,
pe.DOS_HEADER.e_oeminfo,
pe.DOS_HEADER.e_res2,
pe.DOS_HEADER.e_lfanew]
except Exception , e:
print e
return IMAGE_DOS_HEADER_data
def extract_file_header(pe):
FILE_HEADER_data = [0 for i in range(7)]
try:
FILE_HEADER_data=[
pe.FILE_HEADER.Machine,
pe.FILE_HEADER.TimeDateStamp,
pe.FILE_HEADER.PointerToSymbolTable,
pe.FILE_HEADER.NumberOfSymbols,
pe.FILE_HEADER.SizeOfOptionalHeader,
pe.FILE_HEADER.Characteristics]
except Exception,e:
print e
return FILE_HEADER_data
def extract_optional_header(pe):
OPTIONAL_HEADER_data = [0 for i in range(29)]
try:
OPTIONAL_HEADER_data = [pe.OPTIONAL_HEADER.Magic,
pe.OPTIONAL_HEADER.MajorLinkerVersion,
pe.OPTIONAL_HEADER.MinorLinkerVersion,
pe.OPTIONAL_HEADER.SizeOfCode,
pe.OPTIONAL_HEADER.SizeOfInitializedData,
pe.OPTIONAL_HEADER.SizeOfUninitializedData,
pe.OPTIONAL_HEADER.AddressOfEntryPoint,
pe.OPTIONAL_HEADER.BaseOfCode,
pe.OPTIONAL_HEADER.BaseOfData,
pe.OPTIONAL_HEADER.ImageBase,
pe.OPTIONAL_HEADER.SectionAlignment,
pe.OPTIONAL_HEADER.FileAlignment,
pe.OPTIONAL_HEADER.MajorOperatingSystemVersion,
pe.OPTIONAL_HEADER.MinorOperatingSystemVersion,
pe.OPTIONAL_HEADER.MajorImageVersion,
pe.OPTIONAL_HEADER.MinorImageVersion,
pe.OPTIONAL_HEADER.MajorSubsystemVersion,
pe.OPTIONAL_HEADER.MinorSubsystemVersion,
pe.OPTIONAL_HEADER.SizeOfImage,
pe.OPTIONAL_HEADER.SizeOfHeaders,
pe.OPTIONAL_HEADER.CheckSum,
pe.OPTIONAL_HEADER.Subsystem,
pe.OPTIONAL_HEADER.DllCharacteristics,
pe.OPTIONAL_HEADER.SizeOfStackReserve,
pe.OPTIONAL_HEADER.SizeOfStackCommit,
pe.OPTIONAL_HEADER.SizeOfHeapReserve,
pe.OPTIONAL_HEADER.SizeOfHeapCommit,
pe.OPTIONAL_HEADER.LoaderFlags,
pe.OPTIONAL_HEADER.NumberOfRvaAndSizes]
except Exception,e:
print
return OPTIONAL_HEADER_data
def extract_features(pe):
IMAGE_DOS_HEADER_data = extract_image_dos_header(pe)
FILE_HEADER_data = extract_file_header(pe)
OPTIONAL_HEADER_data = extract_optional_header(pe)
return IMAGE_DOS_HEADER_data + FILE_HEADER_data + OPTIONAL_HEADER_data
def main():
source_path = r'C:\Users\833-24\Desktop\새 폴더\malware'
output_file = r'C:\Users\833-24\Desktop\새 폴더\malware\result.csv'
label = ['clean']
f = open(output_file,'wt')
writer = csv.writer(f)
writer.writerow(IMAGE_DOS_HEADER+FILE_HEADER+OPTIONAL_HEADER+['label'])
for subdir,dirs,files in os.walk(source_path):
for file in files:
input_file = source_path + '/' + file
try:
pe = pefile.PE(input_file)
except Exception, e:
print "Exception while loading file : " ,e
else:
try:
features = extract_features(pe)
writer.writerow(features+label)
except Exception,e:
print "Exception while opening and write CSV file : " , e
f.close()
print 'It work !!'
main()
N-Gram 소스코드
import os
import pefile
import pydasm
import operator
import csv
import hashlib
from itertools import chain
from capstone import *
from capstone.x86 import *
class NGRAM_FEATURE:
def __init__(self,output_file):
self.output_file = output_file
self.grams = dict()
def get_asm(self,mode,file):
asm = []
pe = pefile.PE(file)
bytes = []
ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
end = pe.OPTIONAL_HEADER.SizeOfCode
for section in pe.sections:
addr = section.VirtualAddress
size = section.Misc_VirtualSize
if ep > addr and ep < (addr+size):
ep = addr
end = size
data = pe.get_memory_mapped_image()[ep:ep+end]
offset = 0
temp = data.encode('hex')
temp = [temp[i:i+2] for i in range(0,len(temp),2)] #slice 2
if mode:
return temp
md = Cs(CS_ARCH_X86,CS_MODE_32)
md.detail = False
for i in md.disasm(data,0x401000):
asm.append(i.mnemonic)
return asm
def gen_list_n_gram(self,n,asm_code):
for i in range(0,len(asm_code),n):
yield asm_code[i:i+n]
def n_grams(self,n,asm_code,check):
if check == 1:
grams = self.grams
elif check == 0:
grams = dict() #make new dict
gen_list = self.gen_list_n_gram(n,asm_code)
for i in gen_list:
i = ' '.join(i)
try:
grams[i] += 1
except:
grams[i] = 1
return grams
def write_csv_header(self,csv_header):
file_path = self.output_file
HASH = ['filename','MD5']
label = ['label']
header = HASH+csv_header+label
file = open(file_path,'wa')
write = csv.writer(file,delimiter = ',')
write.writerow(header)
file.close()
def count_n_gram(self,grams,header,label):
grams_count = list()
for asm in header:
try:
grams_count.append(grams[asm])
except:
grams_count.append(0)
grams_count.append(label)
return grams_count
def calc_file_hash(self,file_path):
f = open(file_path,'rb')
data = f.read()
hash = hashlib.md5(data).hexdigest()
return hash
def write_csv_data(self,data):
file_path = self.output_file
file = open(file_path,'a')
write = csv.writer(file,delimiter=',')
write.writerow(data)
file.close()
def main():
num_feature = 100
mal_path = '/home/jungry/Desktop/test_folder/'
nor_path = '/home/jungry/Desktop/test_normal/'
output_file = '/home/jungry/Desktop/ngram123.csv'
print('Get N-gram from files')
nf = NGRAM_FEATURE(output_file) #make class
for file in os.listdir(mal_path):
#print('FILE NAME = ',file)
file = mal_path+file
asm_code = nf.get_asm(0,file) #get asmcode
grams = nf.n_grams(4,asm_code,1)
for file in os.listdir(nor_path):
#print('FILE NAME = ',file)
file = nor_path+file
asm_code = nf.get_asm(0,file) #get asmcode
grams = nf.n_grams(4,asm_code,1)
sort_grams = sorted(grams.items(),key=operator.itemgetter(1),reverse=True)
feature = sort_grams[0:num_feature]
csv_header = list(chain.from_iterable(zip(*feature)))[0:num_feature]
nf.write_csv_header(csv_header)
print('COMPLETE HEADER')
#make csv_data
for file in os.listdir(mal_path):
print 'MAL FILE NAME = '+file
file_path = mal_path+file
asm_code = nf.get_asm(0,file_path)
grams = nf.n_grams(4,asm_code,0)
grams_count = nf.count_n_gram(grams,csv_header,1) #grams,header,label
md5_hash = nf.calc_file_hash(file_path)
#write data
data = [file,md5_hash]
data.extend(grams_count)
nf.write_csv_data(data)
print('***COMPLETE MALWARE DATA***')
for file in os.listdir(nor_path):
print 'NOR FILE NAME = '+file
file_path = nor_path+file
asm_code = nf.get_asm(0,file_path)
grams = nf.n_grams(4,asm_code,0)
grams_count = nf.count_n_gram(grams,csv_header,0)
md5_hash = nf.calc_file_hash(file_path)
data = [file,md5_hash]
data.extend(grams_count)
nf.write_csv_data(data)
print('***COMPLETE NORMAL DATA***')
main()
TKinter 를 이용한 시각화 소스코드
#-*-coding:utf-8-*-
import os
import threading
import time
import subprocess
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import tkinter.ttk as ttk
import tkinter.font as tkFont
from tkinter import filedialog
from tkinter import *
global output
output = []
# ============================ Window1 =================================
"""
def delete_window():
global delWin
delWin = Tk()
delWin.title("Finished Scan")
delWin.geometry("200x120")
delWin.resizable(0, 0)
popUp_label = Label(delWin, text='Delete?')
popUp_label.pack(pady=20)
cancleBtn = Button(delWin, text='Cancle', command=delWin.destroy)
cancleBtn.pack(side='bottom', pady=5)
delBtn = Button(delWin, text='Delete', command=midDel)
delBtn.pack(side='bottom')
delWin.mainloop()
"""
# ============================ Window2 =================================
def program_info():
infoWindow = Tk()
infoWindow.title("Program Information")
infoWindow.geometry("640x480+100+100")
infoWindow.resizable(0, 0)
test_font = tkFont.Font(family="/usr/share/fonts/Wemakeprice/Wemakeprice-Regular", size=12)
# Project
# Team name
# About Project
a = StringVar()
project_name = StringVar()
team_name = StringVar()
#program_def = StringVar()
#manual = StringVar()
a.set("2021 정보보호학과 졸업작품")
project_name.set("인공지능을 활용한 악성파일 탐지 시스템 ")
team_name.set("팀명 : 네카라쿠배")
#program_def.set("")
program_def = '''윈도우의 PE 구조의 파일을 PE헤더, 4-gram(N-gram), 바이너리 이미지의
방법으로 특징을 추출하였고 분류 알고리즘을 이용하여 모델링을 진행,
가장 높은 정확도가 나오는 방법을 택해서 본 프로젝트를 진행하였다.
PE헤더와 check_packer 기능과 함께 추출한 특징을 RandomForest 알고리즘을
이용한 방법을 활용하여 Kicom 오픈소스 백신을 이용, GUI 프로그램을 만들어
프로젝트를 완료하였다.
'''
#manual.set("")
manual = '''1. 스캔하고자 하는 폴더를 선택합니다.
2. 선택된 경로를 확인하고 스캔 버튼을 눌러서 결과를 확인합니다.
3. 악성 확률을 확인하고 파일을 지울지 선택합니다. '''
w1 = Label(infoWindow, textvariable=a).pack(side='top')
w2 = Label(infoWindow, textvariable=project_name).pack(side='top')
w3 = Label(infoWindow, textvariable=team_name).pack(side='top')
w4 = Label(infoWindow, justify=LEFT, text=program_def, font=test_font ).pack(side='top')
w5 = Label(infoWindow, justify=LEFT, text=manual, font=test_font ).pack(side='top')
closeBtn = Button(infoWindow, text='Close', command=infoWindow.destroy)
closeBtn.pack(side='bottom', pady=10)
infoWindow.mainloop()
# ============================ Window3 =================================
def team_info():
import webbrowser
new = 1
url = "http://www.nekabe.me"
webbrowser.open(url, new=new)
# ============================ Function ================================
def delMal():
comm = ['python', 'k2.py']
comm.append('-l')
comm.append(path)
print comm
p = subprocess.call(comm)
'''
while True:
res = p.stdout.readline()
if res == '' and p.poll() is not None: break
if res:
outputBox.insert('end', res.strip() + '\n')
outputBox.see(END)
'''
popUp.destroy()
def midDel():
t2 = threading.Thread(target=delMal)
t2.start()
def run():
comm = ['python', 'k2.py']
comm.append('-f')
comm.append(path)
print comm
p = subprocess.Popen(comm, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, shell = False)
while True:
res = p.stdout.readline()
if res == '' and p.poll() is not None: break
if res:
outputBox.insert('end', res.strip() + '\n')
outputBox.see(END)
rc = p.poll()
global popUp
popUp = Tk()
popUp.title('Result Scan')
popUp.geometry("200x120+200+200")
popUp.resizable(0,0)
popUp_label = Label(popUp, text='Delete?')
popUp_label.pack(pady=20)
cancleBtn = Button(popUp, text='Cancle', command=popUp.destroy)
cancleBtn.pack(side='bottom', pady=5)
delBtn = Button(popUp, text='Delete', command=midDel)
delBtn.pack(side='bottom')
popUp.mainloop()
def midRun():
t1 = threading.Thread(target=run)
t1.start()
# ============================ Function ================================
def openDir():
global path
dir_path = filedialog.askdirectory(parent=root, initialdir="/home/stud/Deskop/", title="Select Dir")
path = dir_path
lblText.set('Selected\n' + path)
def force_quit():
pass
# ============================== Main ==================================
root = Tk()
root.geometry('640x480+200+200')
root.resizable(0, 0)
root.title("네카라쿠배 v1.0")
# ============================= Frame1 =================================
top_frame = Frame(root, width=640, height=100)
top_frame.pack(side='top', fill='x')
title_font = tkFont.Font(size=16)
# Frame_left, Frame_right
frame_left = Frame(top_frame, width=240, height=100)
frame_right = Frame(top_frame, width=400, height=100)
frame_left.pack(side='left', fill='x')
frame_right.pack(side='right', fill='x')
# logo_img
#jbu_logo = Image.open("./jbu.png")
image = PhotoImage(file='jbu.png')
logo_label = Label(frame_left, image=image)
logo_label.place(x=130, y=0)
# Title_label
title_text = StringVar()
#title_text.set("2021 정보보호학과")
#title_text.set("2021 정보보호학과 졸업작품")
#title_text.set("정보보호학과 네카라쿠배")
title_text.set("MalWare Scanner")
title_label = Label(frame_right, textvariable=title_text, font=("Verdana", 24,))
title_label.place(x=0, y=40)
# ============================= Frame1-2 =================================
top2_frame = Frame(root, width=600, height=40)
top2_frame.pack(side='top', fill='x', ipadx=35)
# result_label
result_text = StringVar()
#result_text.set("[ 결 과 창 ]")
result_text.set("[ Result ]")
#result_label = Label(top2_frame, textvariable=result_text, font=('Times', 8))
result_label = Label(top2_frame, textvariable=result_text)
result_label.pack(side='left', padx=35)
test_label = Label(top2_frame, text='')
test_label.pack(side='right', padx=15)
# team_info_button
#tInfoBtn = Button(top2_frame, width=14, text="만든 사람들", font=("Times", 8), command=team_info)
tInfoBtn = Button(top2_frame, width=12, text="CREDIT", command=team_info)
tInfoBtn.pack(side='right', padx=5, pady=2)
# program_info_button
#pInfoBtn = Button(top2_frame, width=14, text="프로그램 정보", font=("Times", 8), command=None)
pInfoBtn = Button(top2_frame, width=12, text="PROGRAM INFO", command=program_info)
pInfoBtn.pack(side='right')
# ============================== Frame2 ================================
middle_frame = Frame(root, width=640, height=280)
middle_frame.pack(side='top')
# Result
outputBox = Text(middle_frame)
outputBox.pack(expand=True)
# ============================== Frame3 ================================
bottom_frame = Frame(root, width=640, height=80)
bottom_frame.pack(side='bottom', fill='x')
lblText = StringVar()
lblText.set("Not Search <Dir>")
#label font
# Path Label
path_lbl = Label(bottom_frame, textvariable = lblText) #font=font
path_lbl.pack(pady=5)
# Quit Button -> 3
quitBtn = Button(bottom_frame, width=12, text="Quit", command=root.destroy)
#quitBtn.pack(side="right", padx=5, pady=10)
quitBtn.pack(side="right", padx=5, pady=7)
# Scan Button -> 2
scanBtn = Button(bottom_frame, width=12, text="Scan", command=midRun)
scanBtn.pack(side="right", padx=5)
# Select Folder Button -> 1
openBtn = Button(bottom_frame, width=12, text="Select Folder", command=openDir)
openBtn.pack(side="right", padx=5)
copyright = StringVar()
#copyright.set("Copyright 2021 네카라쿠배")
copyright.set("Copyright 2021 Nekalikube")
copyrightLabel = Label(bottom_frame, textvariable=copyright).pack(side='left', padx=5)
# ============================== END CODE ==============================
root.mainloop()
반응형
'PYTHON > PROJECT' 카테고리의 다른 글
Python Project 05. 디스코드 봇 "Manta" - (2) 사다리타기 (2) | 2021.12.31 |
---|---|
Python Project 05. 디스코드 봇 "Manta" - (1) 롤 전적검색 (0) | 2021.12.31 |
Python Project 05. 디스코드 봇 "Manta" - 구상 (0) | 2021.12.31 |
Python Project 04. 영화리뷰 긍정부정 예측하기 - (3) 전처리 , 학습 (0) | 2020.12.21 |
Python Project 04. 영화리뷰 긍정부정 예측하기 -(2) 간단한 전처리 (0) | 2020.12.21 |
Comments