Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

from . import db, base62 

from .models import (Domain, DomainForm, DynDNSClient, Record, RecordForm, DomainMeta, TsigKey, 

TsigKeyForm) 

 

from flask import abort, redirect, render_template, flash, request, Blueprint 

from flask.views import MethodView 

from logging import getLogger 

from itsdangerous import constant_time_compare 

import os 

 

_logger = getLogger('poff.views') 

 

mod = Blueprint('views', __name__) 

 

 

@mod.context_processor 

def default_context(): 

domains = Domain.query.order_by(Domain.name).all() 

return { 

'domains': domains, 

'tsigkeyform': TsigKeyForm(), 

'domainform': DomainForm(), 

'recordform': RecordForm(), 

} 

 

 

@mod.route('/') 

def main(): 

return render_template('domains.html') 

 

 

@mod.route('/domains', methods=['POST']) 

def domains(): 

form = DomainForm() 

if form.validate_on_submit(): 

domain = Domain() 

form.populate_obj(domain) 

 

soa_record = Record(content='%(domain)s hostmaster.%(domain)s 1970010100' % { 

'domain': domain.name, 

}, 

domain=domain, 

type='SOA', 

name=domain.name, 

) 

soa_record.update_serial() 

 

spf_record = Record(content='v=spf1 -all', domain=domain, type='TXT', 

name=domain.name) 

 

db.session.add(domain) 

db.session.add(soa_record) 

db.session.add(spf_record) 

db.session.add(DomainMeta(domain=domain, kind='SOA-EDIT', content='INCEPTION-INCREMENT')) 

db.session.add(DomainMeta(domain=domain, kind='NSEC3NARROW', content='1')) 

nsec3params = '1 0 1 %s' % os.urandom(16).encode('hex') 

db.session.add(DomainMeta(domain=domain, kind='NSEC3PARAMS', content=nsec3params)) 

_logger.info('New domain saved: %s', domain.name) 

flash('New domain added successfully!', 'success') 

else: 

flash('Failed to validate new domain, check the errors in the form below', 'error') 

_logger.debug('New domain failed form validation.') 

return render_template('domains.html', domainform=form), 400 

return redirect('/') 

 

 

class MethodOverrideView(MethodView): 

""" Helper class that rewrites POST requests with a `_method` param to the given HTTP method. 

""" 

 

allowed_methods = frozenset([ 

'GET', 

'HEAD', 

'POST', 

'DELETE', 

'PUT', 

'PATCH', 

'OPTIONS' 

]) 

 

# Make sure POST requests always are allowed to this view 

methods = ['POST'] 

 

def dispatch_request(self, *args, **kwargs): 

""" Find the target HTTP method and execute it. """ 

meth = getattr(self, request.method.lower(), None) 

 

# if the request method is HEAD and we don't have a handler for it retry with GET 

89 ↛ 90line 89 didn't jump to line 90, because the condition on line 89 was never true if meth is None and request.method == 'HEAD': 

meth = getattr(self, 'get', None) 

 

# If an override is desired, load that if we have it 

method_override = request.form.get('_method', '').upper() 

if method_override in MethodOverrideView.allowed_methods and hasattr(self, method_override.lower()): 

meth = getattr(self, method_override.lower()) 

 

assert meth is not None, 'Unimplemented method %r' % request.method 

return meth(*args, **kwargs) 

 

 

class DomainView(MethodOverrideView): 

 

def delete(self, domain_id): 

domain = Domain.query.get_or_404(domain_id) 

db.session.delete(domain) 

_logger.info("Deleting domain %s", domain.name) 

flash('Domain %s deleted successfully' % domain.name, 'info') 

return redirect('/') 

 

 

class RecordView(MethodOverrideView): 

 

def post(self, record_id): 

record = Record.query.get_or_404(record_id) 

form = RecordForm() 

116 ↛ 124line 116 didn't jump to line 124, because the condition on line 116 was never false if form.validate_on_submit(): 

form.populate_obj(record) 

domain = record.domain 

domain.update_soa() 

_logger.info('Record %s modified', record.name) 

flash('Record successfully modified.', 'success') 

return redirect('/') 

else: 

_logger.info('Record modification failed form validation: %s', form.errors) 

flash('Failed to validate record modifications', 'warning') 

context = { 

'form_errors': form.errors, 

'recordform': form, 

} 

return render_template('domains.html', **context), 400 

 

 

def delete(self, record_id): 

record = Record.query.get_or_404(record_id) 

record.domain.update_soa() 

db.session.delete(record) 

_logger.info("Deleting record %s", record.name) 

flash('Record %s deleted successfully.' % record.name, 'success') 

return redirect('/') 

 

 

@mod.route('/domains/<int:domain_id>/new_record', methods=['POST']) 

def new_record(domain_id): 

domain = Domain.query.get_or_404(domain_id) 

form = RecordForm() 

if form.validate_on_submit(): 

record = Record() 

record.domain = domain 

form.populate_obj(record) 

 

# Ensure MX records always has a prio 

if record.type == 'MX': 

record.prio = record.prio or 0 

 

domain.update_soa() 

db.session.add(record) 

_logger.info('New record saved: %s', record.name) 

flash('New record saved successfully!', 'success') 

else: 

_logger.debug('Record failed form validation') 

flash('Failed to validate new record, check the errors in the form below!', 'warning') 

return render_template('domains.html', recordform=form), 400 

return redirect('/') 

 

 

@mod.route('/domains/<int:domain_id>/tsigkeys', methods=['POST']) 

def domain_tsigkeys(domain_id): 

domain = Domain.query.get_or_404(domain_id) 

form = TsigKeyForm() 

 

171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true if not form.validate_on_submit(): 

_logger.debug('TsigKey failed form validation') 

flash('Failed to validate new DynDNS key', 'warning') 

return render_template('domains.html', tsigkeyform=form), 400 

 

tsigkey = TsigKey() 

form.populate_obj(tsigkey) 

 

domain_tsigkey = DomainMeta(domain=domain, kind='TSIG-ALLOW-DNSUPDATE', 

content=tsigkey.name) 

 

db.session.add(tsigkey) 

db.session.add(domain_tsigkey) 

 

_logger.info('Added tsigkey for for "%s"', domain.name) 

flash('New dyndns key added successfully!', 'success') 

return redirect('/') 

 

 

class TsigKeyView(MethodOverrideView): 

 

def delete(self, domain_id, tsig_name): 

domain = Domain.query.get_or_404(domain_id) 

tsigkey = TsigKey.query.filter_by(name=tsig_name).first() or abort(404) 

tsig_meta = DomainMeta.query.filter_by(domain_id=domain_id, kind='TSIG-ALLOW-DNSUPDATE', 

content=tsig_name).first() 

 

db.session.delete(tsigkey) 

db.session.delete(tsig_meta) 

 

_logger.info('DynDNS key %s deleted', tsigkey.name) 

flash('DynDNS key %s deleted' % tsigkey.name, 'success') 

return redirect('/') 

 

 

@mod.route('/records/<int:record_id>/new-dyndns-client', methods=['POST']) 

def new_dyndns_client(record_id): 

record = Record.query.get_or_404(record_id) 

client = DynDNSClient(record=record) 

db.session.add(client) 

_logger.info('New DynDNS client created for record %s', record.name) 

flash('New DynDNS client created!', 'success') 

return redirect('/') 

 

 

@mod.route('/records/<int:record_id>/rekey', methods=['POST']) 

def rekey_dyndns_record(record_id): 

record = Record.query.get(record_id) 

if not record or not record.dyndns_client: 

abort(404) 

dyndns_client = record.dyndns_client 

dyndns_client.set_new_key() 

return redirect('/') 

 

 

@mod.route('/update-record', methods=['POST']) 

def update_record(): 

record_name = request.form.get('record') 

record = Record.query.filter_by(name=record_name, type='A').first() 

if not record or not record.dyndns_client: 

abort(404) 

submitted_key = str(request.form.get('key', '')) 

record_key = base62.encode(record.dyndns_client.key) 

if constant_time_compare(submitted_key, record_key): 

origin_ip = request.access_route[0] 

if origin_ip.startswith('::ffff:'): 

origin_ip = origin_ip[len('::ffff:'):] 

if record.content != origin_ip: 

_logger.info('Updating record %s to %s', record.name, origin_ip) 

flash('Successfully updated record to new IP: %s' % origin_ip, 'success') 

record.content = origin_ip 

record.domain.update_soa() 

return '', 201 

else: 

flash('Still on the same IP, no change applied', 'success') 

return '' 

else: 

_logger.warning('Bad auth for trying to update record %s', record.name) 

abort(403) 

 

 

class DynDNSClientView(MethodOverrideView): 

 

def delete(self, client_id): 

client = DynDNSClient.query.get_or_404(client_id) 

db.session.delete(client) 

_logger.info('DynDNS client for record %s deleted.', client.record.name) 

flash('DynDNS client deleted!') 

return redirect('/') 

 

 

mod.add_url_rule('/domains/<int:domain_id>', view_func=DomainView.as_view('domain_details')) 

mod.add_url_rule('/records/<int:record_id>', view_func=RecordView.as_view('record_details')) 

mod.add_url_rule('/dyndns-clients/<int:client_id>', 

view_func=DynDNSClientView.as_view('client_details')) 

mod.add_url_rule('/domains/<int:domain_id>/tsigkeys/<tsig_name>', 

view_func=TsigKeyView.as_view('tsigkey_details'))