-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathinquestlabs.py
executable file
·1507 lines (1193 loc) · 58 KB
/
inquestlabs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
InQuest Labs Command Line Driver
Usage:
inquestlabs [options] dfi list
inquestlabs [options] dfi details <sha256> [--attributes]
inquestlabs [options] dfi download <sha256> <path> [--encrypt]
inquestlabs [options] dfi attributes <sha256> [--filter=<filter>]
inquestlabs [options] dfi search (code|context|metadata|ocr) <keyword>
inquestlabs [options] dfi search (md5|sha1|sha256|sha512) <hash>
inquestlabs [options] dfi search (domain|email|filename|filepath|ip|registry|url|xmpid) <ioc>
inquestlabs [options] dfi sources
inquestlabs [options] dfi upload <path>
inquestlabs [options] iocdb list
inquestlabs [options] iocdb search <keyword>
inquestlabs [options] iocdb sources
inquestlabs [options] repdb list
inquestlabs [options] repdb search <keyword>
inquestlabs [options] repdb sources
inquestlabs [options] yara (b64re|base64re) <regex> [(--big-endian|--little-endian)]
inquestlabs [options] yara hexcase <instring>
inquestlabs [options] yara uint <instring> [--offset=<offset>] [--hex]
inquestlabs [options] yara widere <regex> [(--big-endian|--little-endian)]
inquestlabs [options] yara cidr <ipv4>
inquestlabs [options] lookup ip <ioc>
inquestlabs [options] lookup domain <ioc>
inquestlabs [options] report <ioc>
inquestlabs [options] stats
inquestlabs [options] setup <apikey>
inquestlabs [options] trystero list-days
inquestlabs [options] trystero list-samples <yyyy-mm-dd>
Options:
--attributes Include attributes with DFI record.
--api=<apikey> Specify an API key.
--big-endian Toggle big endian.
--config=<config> Configuration file with API key [default: ~/.iqlabskey].
--debug Docopt debugging.
--encrypt Zip sample with password 'infected' before downloading.
--filter=<filter> Filter by attributes type (domain, email, filename, filepath, ip, registry, url, xmpid)
-h --help Show this screen.
--hex Treat <instring> as hex bytes.
-l --limits Show remaining API credits and limit reset window.
--little-endian Toggle little endian.
--offset=<offset> Specify an offset other than 0 for the trigger.
--proxy=<proxy> Intermediate proxy
--timeout=<timeout> Maximum amount of time to wait for IOC report.
--verbose=<level> Verbosity level, outputs to stderr [default: 0].
--version Show version.
"""
# python 2/3 compatability.
from __future__ import print_function
try:
import configparser
except:
import ConfigParser as configparser
# batteries not included.
import docopt
import requests
# disable ssl warnings from requests.
try:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except:
pass
# standard libraries.
import multiprocessing
import ipaddress
import hashlib
import random
import time
import json
import sys
import os
import re
# from importlib.metadata import version
# extract version from installed package metadata
__application_name__ = "inquestlabs"
__version__ = "1.2.4"
# __version__ = version(__application_name__)
__full_version__ = f"{__application_name__} {__version__}"
VALID_CAT = ["ext", "hash", "ioc"]
VALID_EXT = ["code", "context", "metadata", "ocr"]
VALID_HASH = ["md5", "sha1", "sha256", "sha512"]
VALID_IOC = ["domain", "email", "filename", "filepath", "ip", "registry", "url", "xmpid"]
VALID_DOMAIN = re.compile("[a-zA-Z0-9-_]+\.[a-zA-Z0-9-_]+")
# verbosity levels.
INFO = 1
DEBUG = 2
########################################################################################################################
def worker_proxy (labs, endpoint, arguments, response):
"""
proxy function for multiprocessing wrapper used by inquestlabs_api.report()
"""
response[endpoint] = getattr(labs, endpoint)(*arguments)
########################################################################################################################
class inquestlabs_exception(Exception):
pass
########################################################################################################################
class inquestlabs_api:
"""
InQuest Labs API Wrapper
https://labs.inquest.net
"""
####################################################################################################################
def __init__ (self, api_key=None, config=None, proxies=None, base_url=None, retries=3, verify_ssl=True, verbose=0):
"""
Instantiate an interface to InQuest Labs. API key is optional but sourced from (in order): argument, environment
variable, or configuration file. Proxy dictionary is a raw pass thru to python-requests, valid keys are 'http'
and 'https'.
:type api_key: str
:param api_key: API key, optional, can also be supplied via environment variable 'IQLABS_APIKEY'.
:type config: str
:param config: Path to configuration file containing API key, default is '~/.iqlabskey'.
:type proxies: dict
:param proxies: Optional proxy dictionary to pass down to underlying python-requests library.
:type base_url: str
:param base_url: API endpoint.
:type retries: int
:param retries: Number of times to attempt API request before giving up.
:type verify_ssl: bool
:param verify_ssl: Toggles SSL certificate verification when communicating with the API.
:type verbose: int
:param verbose: Values greater than zero provide increased verbosity.
"""
# internalize supplied parameters.
self.api_key = api_key
self.base_url = base_url
self.config_file = config
self.retries = retries
self.proxies = proxies
self.verify_ssl = verify_ssl
self.verbosity = verbose
# internal rate limit tracking.
self.rlimit_requests_remaining = None # requests remaining in this rate limit window.
self.rlimit_reset_epoch_time = None # time, in seconds from epoch, that rate limit window resets.
self.rlimit_reset_epoch_ctime = None # same as above, but in ctime human readable format.
self.rlimit_seconds_to_reset = None # seconds to reset time.
self.api_requests_made = 0 # keep track of how many API requests we've made.
# if no base URL was specified, use the default.
if self.base_url is None:
self.base_url = "https://labs.inquest.net/api"
self.__VERBOSE("base_url=%s" % self.base_url, DEBUG)
# if no config file was supplied, use a default path of ~/.iqlabskey.
if self.config_file is None:
self.config_file = os.path.join(os.path.expanduser("~"), ".iqlabskey")
elif "~" in self.config_file:
self.config_file = os.path.expanduser(self.config_file)
self.__VERBOSE("config_file=%s" % self.config_file, DEBUG)
# if an API key was specified, note the source.
if self.api_key:
self.api_key_source = "supplied"
# otherwise, we don't have an API source yet, we'll check the environment and config files though.
else:
self.api_key_source = "N/A"
# check the environment for one
self.api_key = os.environ.get("IQLABS_APIKEY")
if self.api_key:
self.api_key_source = "environment"
# if we still don't have an API key, try loading one from the config file.
else:
# config file format:
# $ cat .iqlabskey
# [inquestlabs]
# apikey: deadbeefdeadbeefdeadbeefdeadbeefdeadbeef
if os.path.exists(self.config_file) and os.path.isfile(self.config_file):
config = configparser.ConfigParser()
try:
config.read(self.config_file)
except:
raise inquestlabs_exception("invalid configuration file: %s" % self.config_file)
try:
self.api_key = config.get("inquestlabs", "apikey")
except:
raise inquestlabs_exception("unable to find inquestlabs.apikey in: %s" % self.config_file)
# update the source, include the path.
self.api_key_source = "config: %s" % self.config_file
# NOTE: if we still don't have an API key that's fine! InQuest Labs will simply work with some rate limits.
self.__VERBOSE("api_key=%s" % self.api_key, DEBUG)
self.__VERBOSE("api_key_source=%s" % self.api_key_source, INFO)
####################################################################################################################
def API (self, api, data=None, path=None, method="GET", raw=False, params=None):
"""
Internal API wrapper.
:type api: str
:param api: API endpoint, appended to base URL.
:type data: dict
:param data: Optional data dictionary to pass to endpoint.
:type path: str
:param path: Optional path to file to pass to endpoint.
:type method: str
:param method: API method, one of "GET" or "POST".
:type raw: bool
:param raw: Default behavior is to expect JSON encoded content, raise this flag to expect raw data.
:type method: str
:param method: Set a parameter for the request.
:rtype: dict | str
:return: Response dictionary or string if 'raw' flag is raised.
"""
assert method in ["GET", "POST"]
# if a file path was supplied, convert to a dictionary compatible with requests and the labs API.
files = None
if path:
files = dict(file=open(path, "rb"))
# initialize headers with a custom user-agent and if an API key is available, add an authorization header.
headers = \
{
"User-Agent" : "python-inquestlabs/%s" % __version__
}
if self.api_key:
headers["Authorization"] = "Basic %s" % self.api_key
# build the keyword arguments that will be passed to requests library.
kwargs = \
{
"data" : data,
"files" : files,
"headers" : headers,
"proxies" : self.proxies,
"verify" : self.verify_ssl,
"params" : params
}
# make attempts to dance with the API endpoint, use a jittered exponential back-off delay.
last_exception = None
endpoint = self.base_url + api
attempt = 0
self.__VERBOSE("%s %s" % (method, endpoint), INFO)
while 1:
try:
response = requests.request(method, endpoint, **kwargs)
self.api_requests_made += 1
self.__VERBOSE("[%d] %s" % (self.api_requests_made, kwargs), DEBUG)
break
except Exception as e:
last_exception = e
self.__VERBOSE("API exception: %s" % e, INFO)
# 0.4, 1.6, 6.4, 25.6, ...
time.sleep(random.uniform(0, 4 ** attempt * 100 / 1000.0))
attempt += 1
# retries exhausted.
if attempt == self.retries:
message = "exceeded %s attempts to communicate with InQuest Labs API endpoint %s."
message %= self.retries, endpoint
if last_exception:
message += "\nlast exception:\n%s" % str(last_exception)
raise inquestlabs_exception(message)
# update internal rate limit tracking variables.
if hasattr(response, "headers"):
self.rlimit_requests_remaining = response.headers.get('X-RateLimit-Remaining')
self.rlimit_reset_epoch_time = response.headers.get('X-RateLimit-Reset')
if self.rlimit_requests_remaining:
self.rlimit_requests_remaining = int(self.rlimit_requests_remaining)
if self.rlimit_reset_epoch_time:
self.rlimit_reset_epoch_time = int(self.rlimit_reset_epoch_time)
self.rlimit_seconds_to_reset = int(self.rlimit_reset_epoch_time - time.time())
self.rlimit_reset_epoch_ctime = time.ctime(self.rlimit_reset_epoch_time)
self.__VERBOSE("API status_code=%d" % response.status_code, INFO)
self.__VERBOSE(response.content, DEBUG)
# all good.
if response.status_code == 200:
# if the raw flag was raised, return raw content now.
if raw:
return response.content
# otherwise, we convert the assumed JSON response to a python dictionary.
response_json = response.json()
# with a 200 status code, success should always be true...
if response_json['success']:
return response_json['data']
# ... but let's handle corner cases where it may not be.
else:
message = "status=200 but error communicating with %s: %s"
message %= endpoint, response_json.get("error", "n/a")
raise inquestlabs_exception(message)
# rate limit exhaustion.
elif response.status_code == 429:
raise inquestlabs_exception("status=429 rate limit exhausted!")
# something else went wrong.
else:
message = "status=%d error communicating with %s: "
message %= response.status_code, endpoint
try:
response_json = response.json()
message += response_json.get("error", "n/a")
except:
message += str(response.content)
raise inquestlabs_exception(message)
####################################################################################################################
def __HASH (self, path=None, bytes=None, algorithm="md5", block_size=16384, fmt="digest"):
"""
Return the selected algorithms crytographic hash hex digest of the given file.
:type path: str
:param path: Path to file to hash or None if supplying bytes.
:type bytes: str
:param bytes: str bytes to hash or None if supplying a path to a file.
:type algorithm: str
:param algorithm: One of "md5", "sha1", "sha256" or "sha512".
:type block_size: int
:param block_size: Size of blocks to process.
:type fmt: str
:param fmt: One of "digest" (str), "raw" (hashlib object), "parts" (array of numeric parts).
:rtype: str
:return: Hash as hex digest.
"""
def chunks (l, n):
for i in range(0, len(l), n):
yield l[i:i+n]
algorithm = algorithm.lower()
if algorithm == "md5": hashfunc = hashlib.md5()
elif algorithm == "sha1": hashfunc = hashlib.sha1()
elif algorithm == "sha256": hashfunc = hashlib.sha256()
elif algorithm == "sha512": hashfunc = hashlib.sha512()
# hash a file.
if path:
with open(path, "rb") as fh:
while 1:
data = fh.read(block_size)
if not data:
break
hashfunc.update(data)
# hash a stream of bytes.
elif bytes:
hashfunc.update(bytes)
# error.
else:
raise inquestlabs_exception("hash expects either 'path' or 'bytes'.")
# return multiplexor.
if fmt == "raw":
return hashfunc
elif fmt == "parts":
return map(lambda x: int(x, 16), list(chunks(hashfunc.hexdigest(), 8)))
else: # digest
return hashfunc.hexdigest()
####################################################################################################################
def __HASH_VALIDATE (self, hash_str, length=None):
"""
Determine if the given hash string contains valid hex chars for the specified length or entirely, if left out.
:type hash_str: str
:param hash_str: Hash string to verify.
:type length: int
:param length: Number of characters in hash string.
:rtype: bool
:return: True is hash string is valid, False otherwise.
"""
if not hash_str:
return None
if length and len(hash_str) != length:
return False
if re.match("[0-9a-fA-F]+", hash_str, re.I):
return True
return False
####################################################################################################################
def __VERBOSE (self, message, verbosity=INFO):
"""
Outputs 'message' to stderr if instance verbosity is equal to or greater than the supplied verbosity.
:type message: str
:param message: Path to file to hash or None if supplying bytes.
:type verbosity: int
:param verbosity: Minimum verbosity level required to display message.
"""
if self.verbosity >= verbosity:
sys.stderr.write("[verbosity=%d] %s\n" % (self.verbosity, message))
####################################################################################################################
# hash shorcuts.
def md5 (self, path=None, bytes=None): return self.__HASH(path=path, bytes=bytes, algorithm="md5")
def sha1 (self, path=None, bytes=None): return self.__HASH(path=path, bytes=bytes, algorithm="sha1")
def sha256 (self, path=None, bytes=None): return self.__HASH(path=path, bytes=bytes, algorithm="sha256")
def sha512 (self, path=None, bytes=None): return self.__HASH(path=path, bytes=bytes, algorithm="sha512")
def is_md5 (self, hash_str): return self.__HASH_VALIDATE(hash_str, 32)
def is_sha1 (self, hash_str): return self.__HASH_VALIDATE(hash_str, 40)
def is_sha256 (self, hash_str): return self.__HASH_VALIDATE(hash_str, 64)
def is_sha512 (self, hash_str): return self.__HASH_VALIDATE(hash_str, 128)
####################################################################################################################
def dfi_attributes (self, sha256, filter_by=None):
"""
Retrieve attributes for a given file by SHA256 hash value.
:type sha256: str
:param sha256: SHA256 hash for the file we are interested in.
:type filter_by: str
:param filter_by: Optional filter, can be one of 'domain', 'email', 'filename', 'filepath', ip', 'registry', 'url', 'xmpid'.
:rtype: dict
:return: API response.
"""
# if a filter is specified, sanity check.
if filter_by:
filter_by = filter_by.lower()
if filter_by not in VALID_IOC:
message = "invalid attribute filter '%s'. valid filters include: %s"
message %= filter_by, ", ".join(VALID_IOC)
raise inquestlabs_exception(message)
# dance with the API.
attributes = self.API("/dfi/details/attributes", dict(sha256=sha256))
# filter if necessary.
if filter_by:
# sample data:
# [
# {
# "category": "ioc",
# "attribute": "domain",
# "count": 1,
# "value": "ancel.To"
# },
# {
# "category": "ioc",
# "attribute": "domain",
# "count": 1,
# "value": "Application.Top"
# }
# ]
attributes = [attr for attr in attributes if attr['attribute'] == filter_by]
# return attributes.
return attributes
####################################################################################################################
def dfi_details (self, sha256, attributes=False):
"""
Retrieve details for a given file by SHA256 hash value. Optionally, pull attributes in a second API request
and append to the data dictionary under the key 'attributes'.
Returned dictionary keys and value types include::
analysis_completed: bool
classification: MALICIOUS|BENIGN
ext_code: str
ext_context: str
ext_metadata: str
ext_ocr: str
file_type: CAB|DOC|DOCX|EML|MSI|OLE|PCAP|PPT|TNEF|XLS
first_seen: str ex: Thu, 07 Nov 2019 21:26:53 GMT
inquest_alerts: dict keys=category,description,reference,title
inquest_dfi_size: int
last_inquest_dfi: str
last_inquest_featext: str
last_updated: str
len_code: int
len_context: int
len_metadata: int
len_ocr: int
md5: str
mime_type: str
sha1: str
sha256: str
sha512: str
size: int
subcategory: str
subcategory_url: str
virus_total: str
:type sha256: str
:param sha256: SHA256 hash for the file we are interested in.
:type attributes: bool
:param attributes: Raise this flag to includes 'attributes' subkey.
:rtype: dict
:return: API response.
"""
assert self.is_sha256(sha256)
# API dance.
data = self.API("/dfi/details", dict(sha256=sha256))
if attributes:
data['attributes'] = self.dfi_attributes(sha256)
return data
####################################################################################################################
def dfi_download (self, sha256, path, encrypt=False):
"""
Download requested file and save to path.
:type sha256: str
:param sha256: SHA256 hash for the file we are interested in.
:type path: str
:param path: Where we want to save the file.
:type encrypt: bool
:param encrypt: Raise this flag to download the file inside a Zip file encrypted with the password 'infected'.
"""
assert self.is_sha256(sha256)
# NOTE: we're reading the file directly into memory here! not worried about it as the files are small and we
# done anticipate any OOM issues.
data = self.API("/dfi/download", dict(sha256=sha256, encrypt_download=encrypt), raw=True)
# if we requested a raw download, then ensure we got what we were looking for.
if not encrypt:
calculated = self.sha256(bytes=data)
if calculated != sha256:
message = "failed downloading file! expected sha256=%s calculated sha256=%s"
message %= sha256, calculated
raise inquestlabs_exception(message)
# write the file to disk.
with open(path, "wb+") as fh:
fh.write(data)
####################################################################################################################
def dfi_list (self, malicious=None, kind=None, has_code=None, has_context=None, has_metadata=None, has_ocr=None):
"""
Retrieve the most recent DFI entries. Example dictionary returned in list::
{'analysis_completed': True,
'classification': 'MALICIOUS',
'file_type': 'DOC',
'first_seen': 'Thu, 07 Nov 2019 21:26:53 GMT',
'inquest_alerts': [],
'last_inquest_featext': 'Thu, 07 Nov 2019 21:30:23 GMT',
'len_code': 10963,
'len_context': 24,
'len_metadata': 1021,
'len_ocr': 0,
'mime_type': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'sha256': 'f7702e873c1a26e8171d771180108a9735cb5a2b69958e14b51eb572973cfb7b',
'size': 821038,
'subcategory': 'macro_hunter',
'subcategory_url': 'https://github.com/InQuest/yara-rules/blob/master/labs.inquest.net/macro_hunter.rule'}
:type malicious: bool
:param malicious: Filter results by whether or not they are malicious.
:type kind: str
:param kind: Filter list by high level type, ex: 'DOC', 'DOCX', 'OLE', 'PPT', 'XLS'.
:type has_code: int
:param has_code: Filter results by whether or not they contain X bytes of embedded logic.
:type has_context: int
:param has_context: Filter results by whether or not they contain X bytes of semantic information.
:type has_metadata: int
:param has_metadata: Filter results by whether or not they contain X bytes of any metadata.
:type has_ocr: int
:param has_ocr: Filter results by whether or not they contain X bytes of OCR extracted semantic data.
:rtype: list
:return: List of dictionaries.
"""
filtered = []
for entry in self.API("/dfi/list"):
# process filters as disqualifiers.
if malicious == True and entry['classification'] != "MALICIOUS":
continue
if malicious == False and entry['classification'] != "UNKNOWN":
continue
if kind is not None and entry['file_type'] != kind:
continue
if has_code is not None and entry['len_code'] < has_code:
continue
if has_context is not None and entry['len_context'] < has_context:
continue
if has_metadata is not None and entry['len_metadata'] < has_metadata:
continue
if has_ocr is not None and entry['len_ocr'] < has_ocr:
continue
# if we're still here, we keep the entry.
filtered.append(entry)
return filtered
####################################################################################################################
def dfi_search (self, category, subcategory, keyword):
"""
Search DFI category/subcategory by keyword. Valid categories include: 'ext', 'hash', and 'ioc'. Valid
subcategories for each include: ext: 'code', 'context', 'metadata', and 'ocr'. hash: 'md5', 'sha1', 'sha256',
and 'sha512'. ioc: 'domain', 'email', 'filename', 'filepath', ip', 'registry', url', 'xmpid'. See
https://labs.inquest.net for more information.
Example dictionary returned in list of matched entries::
{'analysis_completed': True,
'classification': 'MALICIOUS',
'file_type': 'DOC',
'first_seen': 'Thu, 07 Nov 2019 21:26:53 GMT',
'inquest_alerts': [],
'last_inquest_featext': 'Thu, 07 Nov 2019 21:30:23 GMT',
'len_code': 10963,
'len_context': 24,
'len_metadata': 1021,
'len_ocr': 0,
'mime_type': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'sha256': 'f7702e873c1a26e8171d771180108a9735cb5a2b69958e14b51eb572973cfb7b',
'size': 821038,
'subcategory': 'macro_hunter',
'subcategory_url': 'https://github.com/InQuest/yara-rules/blob/master/labs.inquest.net/macro_hunter.rule'}
:type category: str
:param category: Search category, one of 'ext', 'hash', or 'ioc'.
:type subcategory: str
:param subcategory: Search subcategory.
:type keyword: str
:param keyword: Keyword, hash, or IOC to search for.
:rtype: list
:return: API response.
"""
# normalize to lowercase.
category = category.lower()
subcategory = subcategory.lower()
# sanity check.
if category not in VALID_CAT:
message = "invalid category '%s'. valid categories include: %s"
message %= category, ", ".join(VALID_CAT)
raise inquestlabs_exception(message)
for c, v in zip(VALID_CAT, [VALID_EXT, VALID_HASH, VALID_IOC]):
if category == c and subcategory not in v:
message = "invalid subcategory '%s' for category '%s'. valid subcategories include: %s"
message %= subcategory, category, ", ".join(v)
raise inquestlabs_exception(message)
# API dance.
if category == "ext":
subcategory = "ext_" + subcategory
if category == "hash":
data = dict(hash=keyword)
else:
data = dict(keyword=keyword)
return self.API("/dfi/search/%s/%s" % (category, subcategory), data)
####################################################################################################################
def dfi_sources (self):
"""
Retrieves the list of YARA hunt rules that run atop of Virus Total Intelligence and fuel the majority of the
DFI corpus.
:rtype: dict
:return: API response.
"""
return self.API("/dfi/sources")
####################################################################################################################
def dfi_upload (self, path):
"""
Uploads a file to InQuest Labs for Deep File Inspection (DFI). Note that the file must be one of doc, docx, ppt,
pptx, xls, xlsx.
:type path: str
:param path: Path to file to upload.
:rtype: dict
:return: API response.
"""
VALID_TYPES = ["doc", "docx", "ppt", "pptx", "xls", "xlsx"]
# ensure the path exists and points to a file.
if not os.path.exists(path) or not os.path.isfile(path):
raise inquestlabs_exception("invalid file path specified for upload: %s" % path)
# ensure the file is an OLE (pre 2007 Office file) or ZIP (post 2007 Office file).
with open(path, "rb") as fh:
if fh.read()[:2] not in [b"\xD0\xCF", b"PK"]:
message = "unsupported file type for upload, valid files include: %s, etc..."
message %= ", ".join(VALID_TYPES)
raise inquestlabs_exception(message)
# dance with the API.
return self.API("/dfi/upload", method="POST", path=path)
####################################################################################################################
def iocdb_list (self, kind=None, ref_link_keyword=None, ref_text_keyword=None):
"""
Retrieve a list of the most recent entries added to the InQuest Labs IOC database. Example data::
{
"artifact": "85b936960fbe5100c170b777e1647ce9f0f01e3ab9742dfc23f37cb0825b30b5",
"artifact_type": "hash",
"created_date": "Thu, 14 Nov 2019 19:14:55 GMT",
"reference_link": "http://feedproxy.google.com/~r/feedburner/Talos/~3/cWpezcI4rFw/threat-source-newsletter-nov-14-2019.html",
"reference_text": "Newsletter compiled by Jon Munshaw. Welcome to this week's Threat Source newsletter - the perfect place to get caught up on all things Talos..."
}
:type kind: str
:param kind: Filter results by data type, can be one of 'ip', 'url', 'domain', 'yara', 'hash'.
:type ref_link_keyword: str
:param ref_link_keyword: Filter results by keyword in reference link.
:type ref_text_keyword: str
:param ref_text_keyword: Filter results by keyword in reference text.
:rtype: dict
:return: API response.
"""
filtered = []
for entry in self.API("/iocdb/list"):
# process filters as disqualifiers.
if kind is not None and not entry['artifact_type'].startswith(kind.lower()):
continue
if ref_link_keyword is not None and ref_link_keyword not in entry['reference_link'].lower():
continue
if ref_text_keyword is not None and ref_text_keyword not in entry['reference_text'].lower():
continue
# if we're still here, we keep the entry.
filtered.append(entry)
return filtered
####################################################################################################################
def iocdb_search (self, keyword):
"""
Search the InQuest Labs IOC database for entries matching the keyword.
:type keyword: str
:param keyword: Search term.
:rtype: dict
:return: API response.
"""
return self.API("/iocdb/search", dict(keyword=keyword))
####################################################################################################################
def iocdb_sources (self):
"""
Retrieves the list of sources that fuel the InQuest Labs IOC database.
:rtype: dict
:return: API response.
"""
return self.API("/iocdb/sources")
########################################################################################################################
def is_ipv4 (self, s):
# we prefer to use the ipaddress third-party module here, but fall back to a regex solution.
try:
import ipaddress
except:
if re.match("^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", s):
return True
else:
return False
# python 2/3 compat
try:
s = unicode(s)
except:
pass
# is instance of IPv4 address?
try:
return isinstance(ipaddress.ip_address(s), ipaddress.IPv4Address)
except:
return False
########################################################################################################################
def is_ipv6 (self, s):
# best effort pull in third-party module.
try:
import ipaddress
except:
return None
# python 2/3 compat
try:
s = unicode(s)
except:
pass
# is instance of IPv6 address?
try:
return isinstance(ipaddress.ip_address(s), ipaddress.IPv6Address)
except:
return False
####################################################################################################################
def is_domain (self, s):
return VALID_DOMAIN.match(s)
####################################################################################################################
def is_ip (self, s):
return self.is_ipv4(s) or self.is_ipv6(s)
####################################################################################################################
def lookup (self, kind, ioc):
"""
Lookup information regarding IP address or Domain Name.
:type kind: str
:param kind: One of "IP" or "Domain".
:type ioc: str
:param ioc: Indicator to lookup.
:rtype: dict
:return: API response.
"""
kind = kind.lower()
assert kind in ["ip", "domain"]
return self.API("/lookup/%s" % kind, dict(indicator=ioc))
####################################################################################################################
def rate_limit_banner (self):
"""
Returns a string describing number of API requests made since instantiation, remaining API credits (if a rate
limit is imposed), and when the rate limit window resets.
:rtype: str
:return: Request and rate limit information, in human readable format.
"""
if not self.api_requests_made:
return "Rate limit information not available, no API requests made."
if self.rlimit_requests_remaining:
limit_banner = "%d API requests made. %d API requests remaining. Rate limit window resets on %s."
limit_banner %= self.api_requests_made, self.rlimit_requests_remaining, self.rlimit_reset_epoch_ctime
else:
limit_banner = "%d API requests made. No rate limit! API key sourced from %s."
limit_banner %= self.api_requests_made, self.api_key_source
return limit_banner
####################################################################################################################
def repdb_list (self, kind=None, source=None):
"""
Retrieve a list of the most recent entries added to the InQuest Labs reputation database. Example data::
{
"created_date": "Thu, 14 Nov 2019 18:22:00 GMT",
"data": "beautyevent.ru/Invoice-for-j/b-03/05/2018/",
"data_type": "url",
"derived": "beautyevent.ru",
"derived_type": "domain",
"source": "urlhaus",
"source_url": "https://urlhaus.abuse.ch/host/beautyevent.ru"
}
:type kind: str
:param kind: Filter results by data type, can be one of 'ip', 'url', 'domain', 'asn'.
:type source: str
:param source: Filter results by source, examples include: 'alienvault', 'blocklist', 'urlhaus', etc..
:rtype: dict
:return: API response.
"""
filtered = []
for entry in self.API("/repdb/list"):
# process filters as disqualifiers.
if kind is not None and not entry['data_type'].startswith(kind.lower()):
continue
if source is not None and not entry['source'].startswith(source.lower()):
continue
# if we're still here, we keep the entry.
filtered.append(entry)
return filtered
####################################################################################################################
def repdb_search (self, keyword):
"""
Search the InQuest Labs reputation database for entries matching the keyword.
:type keyword: str
:param keyword: Search term.
:rtype: dict
:return: API response.
"""
return self.API("/repdb/search", dict(keyword=keyword))
####################################################################################################################
def repdb_sources (self):
"""
Retrieves the list of sources that fuel the InQuest Labs reputaiton database.
:rtype: dict
:return: API response.
"""
return self.API("/repdb/sources")
####################################################################################################################
def report (self, ioc, timeout=None):