summaryrefslogtreecommitdiff
path: root/examples/py-ja3-checker/py-ja3-checker.py
blob: 3e7e9418f8e7facfd92cdbca8204ef3b70ede1a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3

import io
import json
import os
import pandas
import requests
import sys
import time

sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId')
sys.path.append(os.path.dirname(sys.argv[0]) + '/../usr/share/nDPId')
try:
    import nDPIsrvd
    from nDPIsrvd import nDPIsrvdSocket
except ImportError:
    sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
    import nDPIsrvd
    from nDPIsrvd import nDPIsrvdSocket

global ja3_fps
ja3_fps = dict()
# 1 hour = 3600 sec/hour = (60 minutes/hour) * (60 seconds/minute)
JA3_FP_MAX_AGE = 60 * 60

global ja3_bl
ja3_bl = None

global ja3_bl_printed
ja3_bl_printed = dict()


def downloadJA3Blacklist():
    response = requests.get(
        'https://sslbl.abuse.ch/blacklist/ja3_fingerprints.csv'
    )
    if response.status_code == 200:
        global ja3_bl
        ja3_bl = pandas.read_csv(io.StringIO(response.text), header=9)
        return True
    return False


def getBlacklisted(ja3_hash):
    global ja3_bl
    return ja3_bl[(ja3_bl['# ja3_md5'] == ja3_hash)]


def checkBlacklisted(ja3_hash):
    if ja3_bl is None:
        return
    csv_entry = getBlacklisted(ja3_hash)
    if not csv_entry.empty and ja3_hash not in ja3_bl_printed:
        print('Found CSV JA3 blacklist entry:')
        print(csv_entry)
        ja3_bl_printed[ja3_hash] = True


class JA3ER(object):
    def __init__(self, json_dict):
        self.json = json_dict
        self.last_checked = time.time()

    def isTooOld(self):
        current_time = time.time()
        if current_time - self.last_checked >= JA3_FP_MAX_AGE:
            return True
        return False


def isJA3InfoTooOld(ja3_hash):
    global ja3_fps
    if ja3_hash in ja3_fps:
        if ja3_fps[ja3_hash].isTooOld() is True:
            print('Fingerprint {} too old, re-newing..'.format(ja3_hash))
            return True
    else:
        return True

    return False


def getInfoFromJA3ER(ja3_hash):
    global ja3_fps
    response = requests.get('https://ja3er.com/search/' + ja3_hash)
    if response.status_code == 200:
        ja3_fps[ja3_hash] = JA3ER(json.loads(response.text, strict=True))
        if 'error' not in ja3_fps[ja3_hash].json:
            print('Fingerprints for JA3 {}:'.format(ja3_hash))
            for ua in ja3_fps[ja3_hash].json:
                if 'User-Agent' in ua:
                    print('\tUser-Agent: {}\n'
                          '\t            Last seen: {}, '
                          'Count: {}'.format(ua['User-Agent'],
                                             ua['Last_seen'],
                                             ua['Count']))
                elif 'Comment' in ua:
                    print('\tComment...: {}\n'
                          '\t            Reported: {}'
                          .format(ua['Comment'].replace('\r', '')
                                  .replace('\n', ' '), ua['Reported']))
                else:
                    print(ua)
        else:
            print('No fingerprint for JA3 {} found.'.format(ja3_hash))


def onJsonLineRecvd(json_dict, current_flow, global_user_data):
    if 'tls' in json_dict and 'ja3' in json_dict['tls']:

        if json_dict['tls']['client_requested_server_name'] == 'ja3er.com':
            return True

        if isJA3InfoTooOld(json_dict['tls']['ja3']) is True:
            getInfoFromJA3ER(json_dict['tls']['ja3'])

        if isJA3InfoTooOld(json_dict['tls']['ja3']) is True:
            getInfoFromJA3ER(json_dict['tls']['ja3s'])

        checkBlacklisted(json_dict['tls']['ja3'])

    return True


if __name__ == '__main__':
    argparser = nDPIsrvd.defaultArgumentParser()
    args = argparser.parse_args()
    address = nDPIsrvd.validateAddress(args)

    sys.stderr.write('Recv buffer size: {}\n'
                     .format(nDPIsrvd.NETWORK_BUFFER_MAX_SIZE))
    sys.stderr.write('Connecting to {} ..\n'
                     .format(address[0] + ':' +
                             str(address[1])
                             if type(address) is tuple else address))

    if downloadJA3Blacklist() is False:
        print('Could not download JA3 blacklist.')
    nsock = nDPIsrvdSocket()
    nsock.connect(address)
    try:
        nsock.loop(onJsonLineRecvd, None)
    except nDPIsrvd.SocketConnectionBroken as err:
        sys.stderr.write('\n{}\n'.format(err))
    except KeyboardInterrupt:
        print()