-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathFileRequestPlugin.py
282 lines (246 loc) · 10.5 KB
/
FileRequestPlugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
from Plugin import PluginManager
from Config import config
from Crypt import CryptBitcoin as Crypt
from util import SafeRe
import json
import time
import gevent
import hashlib
from .p2putil import getWebsockets
@PluginManager.registerTo("FileRequest")
class FileRequestPlugin(object):
# Re-broadcast to neighbour peers
def actionPeerBroadcast(self, params):
gevent.spawn(self.handlePeerBroadcast, params)
def handlePeerBroadcast(self, params):
ip = "%s:%s" % (self.connection.ip, self.connection.port)
if "trace" in params:
params["trace"].append(ip)
raw = json.loads(params["raw"])
res, signature_address, cert, msg_hash = self.peerCheckMessage(raw, params, ip)
if not res:
return
self.response({
"ok": "thx"
})
site = self.sites.get(raw["site"])
websockets = getWebsockets(site)
if websockets:
# Wait for result (valid/invalid)
site.p2p_result[msg_hash] = gevent.event.AsyncResult()
# Send to WebSocket
for ws in websockets:
ws.cmd("peerReceive", {
"ip": ip,
"hash": msg_hash,
"message": raw["message"],
"signed_by": signature_address,
"cert": cert,
"site": raw["site"],
"broadcast": True,
"trace": params.get("trace"),
"timestamp": raw.get("timestamp")
})
# Maybe active filter will reply?
if websockets:
# Wait for p2p_result
result = site.p2p_result[msg_hash].get()
del site.p2p_result[msg_hash]
if not result:
self.connection.badAction(10)
return
# Save to cache
if not websockets and raw["immediate"]:
site.p2p_unread.append({
"ip": "%s:%s" % (self.connection.ip, self.connection.port),
"hash": msg_hash,
"message": raw["message"],
"signed_by": signature_address,
"cert": cert,
"site": raw["site"],
"broadcast": True,
"trace": params.get("trace"),
"timestamp": raw.get("timestamp")
})
# Get peer list
peers = site.getConnectedPeers()
if len(peers) < raw["peer_count"]: # Add more, non-connected peers if necessary
peers += site.getRecentPeers(raw["peer_count"] - len(peers))
# Send message to neighbour peers
for peer in peers:
gevent.spawn(peer.request, "peerBroadcast", params)
# Receive by-ip messages
def actionPeerSend(self, params):
gevent.spawn(self.handlePeerSend, params)
def handlePeerSend(self, params):
ip = "%s:%s" % (self.connection.ip, self.connection.port)
raw = json.loads(params["raw"])
res, signature_address, cert, msg_hash = self.peerCheckMessage(raw, params, ip)
if not res:
return
self.response({
"ok": "thx"
})
site = self.sites.get(raw["site"])
if "to" in raw:
# This is a reply to peerSend
site.p2p_to[raw["to"]].set({
"hash": msg_hash,
"message": raw["message"],
"signed_by": signature_address,
"cert": cert,
"timestamp": raw.get("timestamp")
})
else:
# Broadcast
websockets = getWebsockets(site)
if websockets:
# Wait for result (valid/invalid)
site.p2p_result[msg_hash] = gevent.event.AsyncResult()
for ws in websockets:
ws.cmd("peerReceive", {
"ip": ip,
"hash": msg_hash,
"message": raw["message"],
"signed_by": signature_address,
"cert": cert,
"site": raw["site"],
"broadcast": False,
"timestamp": raw.get("timestamp")
})
# Maybe active filter will reply?
if websockets:
# Wait for p2p_result
result = site.p2p_result[msg_hash].get()
del site.p2p_result[msg_hash]
if not result:
self.connection.badAction(10)
# Save to cache
if not websockets and raw["immediate"]:
site.p2p_unread.append({
"ip": ip,
"hash": msg_hash,
"message": raw["message"],
"signed_by": signature_address,
"cert": cert,
"site": raw["site"],
"broadcast": False,
"timestamp": raw.get("timestamp")
})
def peerCheckMessage(self, raw, params, ip):
# Calculate hash from nonce
msg_hash = hashlib.sha256(("%s,%s" % (params["nonce"], params["raw"])).encode("ascii")).hexdigest()
# Check that p2p.json exists
site = self.sites.get(raw["site"])
if not site.storage.isFile("p2p.json"):
self.connection.log("Site %s doesn't support P2P messages" % raw["site"])
self.connection.badAction(5)
self.response({
"error": "Site %s doesn't support P2P messages" % raw["site"]
})
return False, "", None, msg_hash
# Check whether P2P messages are supported
p2p_json = site.storage.loadJson("p2p.json")
if "filter" not in p2p_json:
self.connection.log("Site %s doesn't support P2P messages" % raw["site"])
self.connection.badAction(5)
self.response({
"error": "Site %s doesn't support P2P messages" % raw["site"]
})
return False, "", None, msg_hash
# Was the message received yet?
if msg_hash in site.p2p_received:
self.response({
"warning": "Already received, thanks"
})
return False, "", None, msg_hash
site.p2p_received.append(msg_hash)
# Check whether the message matches passive filter
if not SafeRe.match(p2p_json["filter"], json.dumps(raw["message"])):
self.connection.log("Invalid message for site %s: %s" % (raw["site"], raw["message"]))
self.connection.badAction(5)
self.response({
"error": "Invalid message for site %s: %s" % (raw["site"], raw["message"])
})
return False, "", None, msg_hash
# Not so fast
if "freq_limit" in p2p_json and time.time() - site.p2p_last_recv.get(ip, 0) < p2p_json["freq_limit"]:
self.connection.log("Too fast messages from %s" % raw["site"])
self.connection.badAction(2)
self.response({
"error": "Too fast messages from %s" % raw["site"]
})
return False, "", None, msg_hash
site.p2p_last_recv[ip] = time.time()
# Not so much
if "size_limit" in p2p_json and len(json.dumps(raw["message"])) > p2p_json["size_limit"]:
self.connection.log("Too big message from %s" % raw["site"])
self.connection.badAction(7)
self.response({
"error": "Too big message from %s" % raw["site"]
})
return False, "", None, msg_hash
# Verify signature
if params["signature"]:
signature_address, signature = params["signature"].split("|")
what = "%s|%s|%s" % (signature_address, msg_hash, params["raw"])
if not Crypt.verify(what, signature_address, signature):
self.connection.log("Invalid signature")
self.connection.badAction(7)
self.response({
"error": "Invalid signature"
})
return False, "", None, msg_hash
# Now check auth providers
if params.get("cert"):
# Read all info
cert_auth_type, cert_auth_user_name, cert_issuer, cert_sign = map(
lambda b: b.decode("ascii") if isinstance(b, bytes) else b,
params["cert"]
)
# This is what certificate issuer signs
cert_subject = "%s#%s/%s" % (signature_address, cert_auth_type, cert_auth_user_name)
# Now get cert issuer address
cert_signers = p2p_json.get("cert_signers", {})
cert_addresses = cert_signers.get(cert_issuer, [])
# And verify it
if not Crypt.verify(cert_subject, cert_addresses, cert_sign):
self.connection.log("Invalid signature certificate")
self.connection.badAction(7)
self.response({
"error": "Invalid signature certificate"
})
return False, "", None, msg_hash
# And save the ID
cert = "%s/%s@%s" % (cert_auth_type, cert_auth_user_name, cert_issuer)
else:
# Old-style sign
cert = ""
else:
signature_address = ""
cert = ""
# Check that the signature address is correct
if "signed_only" in p2p_json:
valid = p2p_json["signed_only"]
if valid is True and not signature_address:
self.connection.log("Not signed message")
self.connection.badAction(5)
self.response({
"error": "Not signed message"
})
return False, "", None, msg_hash
elif isinstance(valid, str) and signature_address != valid:
self.connection.log("Message signature is invalid: %s not in [%r]" % (signature_address, valid))
self.connection.badAction(5)
self.response({
"error": "Message signature is invalid: %s not in [%r]" % (signature_address, valid)
})
return False, "", None, msg_hash
elif isinstance(valid, list) and signature_address not in valid:
self.connection.log("Message signature is invalid: %s not in %r" % (signature_address, valid))
self.connection.badAction(5)
self.response({
"error": "Message signature is invalid: %s not in %r" % (signature_address, valid)
})
return False, "", None, msg_hash
return True, signature_address, cert, msg_hash