diff --git a/changelogs/fragments/rule_pass_before_deny_ordering.yml b/changelogs/fragments/rule_pass_before_deny_ordering.yml new file mode 100644 index 00000000..1752d3d2 --- /dev/null +++ b/changelogs/fragments/rule_pass_before_deny_ordering.yml @@ -0,0 +1,2 @@ +bugfixes: + - pfsense_rule - New pass/match rules without explicit ``after`` or ``before`` are now inserted before the first block/reject rule on the same interface, preserving correct allow-before-deny ordering. diff --git a/changelogs/fragments/rule_protocol_any_with_ports.yml b/changelogs/fragments/rule_protocol_any_with_ports.yml new file mode 100644 index 00000000..85cc1eff --- /dev/null +++ b/changelogs/fragments/rule_protocol_any_with_ports.yml @@ -0,0 +1,2 @@ +bugfixes: + - pfsense_rule - Allow protocol ``any`` with destination/source ports, matching pfSense UI behaviour. diff --git a/plugins/module_utils/rule.py b/plugins/module_utils/rule.py index 44f19b55..451ace7b 100644 --- a/plugins/module_utils/rule.py +++ b/plugins/module_utils/rule.py @@ -118,8 +118,8 @@ def _params_to_obj(self): if params.get('destination_port'): self.pfsense.parse_port(params['destination_port'], obj['destination']) - if params['protocol'] not in ['tcp', 'udp', 'tcp/udp'] and ('port' in obj['source'] or 'port' in obj['destination']): - self.module.fail_json(msg="{0}: you can't use ports on protocols other than tcp, udp or tcp/udp".format(self._get_obj_name())) + if params['protocol'] not in ['tcp', 'udp', 'tcp/udp', 'any'] and ('port' in obj['source'] or 'port' in obj['destination']): + self.module.fail_json(msg="{0}: you can't use ports on protocols other than tcp, udp, tcp/udp or any".format(self._get_obj_name())) for param in ['destination', 'source']: if 'address' in obj[param]: @@ -447,6 +447,12 @@ def _get_expected_rule_xml_index(self): found, i = self._find_rule(self.obj['descr']) if found is not None: return i + # For pass/match rules, insert before the first block/reject rule + # on the same interface to maintain correct allow-before-deny ordering. + if self.obj.get('type', 'pass') not in ('block', 'reject'): + idx = self._get_first_deny_rule_xml_index() + if idx is not None: + return idx return self._get_last_rule_xml_index() + 1 return -1 @@ -469,6 +475,18 @@ def _get_last_rule_xml_index(self): i += 1 return last_found + def _get_first_deny_rule_xml_index(self): + """ Find the first block/reject rule for the interface/floating and return its xml index. + Returns None if no block/reject rules exist for this interface. """ + i = 0 + for rule_elt in self.root_elt: + if self._match_interface(rule_elt): + type_elt = rule_elt.find('type') + if type_elt is not None and type_elt.text in ('block', 'reject'): + return i + i += 1 + return None + @staticmethod def _get_params_to_remove(): """ returns the list of params to remove if they are not set """ diff --git a/tests/unit/plugins/modules/fixtures/pfsense_rule_config.xml b/tests/unit/plugins/modules/fixtures/pfsense_rule_config.xml index c869f758..20402f58 100644 --- a/tests/unit/plugins/modules/fixtures/pfsense_rule_config.xml +++ b/tests/unit/plugins/modules/fixtures/pfsense_rule_config.xml @@ -1007,6 +1007,52 @@ pass + + + + + + block_all_lan + + + + + + + + keep state + + + + 1545602800 + lan + inet + block + + + + + + + + reject_all_lan + + + + + + + + keep state + + + + 1545602801 + lan + inet + reject + + diff --git a/tests/unit/plugins/modules/test_pfsense_rule_create.py b/tests/unit/plugins/modules/test_pfsense_rule_create.py index f300e07f..388b72aa 100644 --- a/tests/unit/plugins/modules/test_pfsense_rule_create.py +++ b/tests/unit/plugins/modules/test_pfsense_rule_create.py @@ -278,7 +278,7 @@ def test_rule_create_source_alias_invalid(self): def test_rule_create_invalid_ports(self): """ test creation of a new rule with an invalid use of ports """ obj = dict(name='one_rule', source='192.193.194.195', destination='any:22', interface='lan', protocol='icmp') - msg = "'one_rule' on 'lan': you can't use ports on protocols other than tcp, udp or tcp/udp" + msg = "'one_rule' on 'lan': you can't use ports on protocols other than tcp, udp, tcp/udp or any" self.do_module_test(obj, failed=True, msg=msg) def test_rule_create_source_ip_invalid(self): @@ -652,3 +652,85 @@ def test_rule_create_schedule_invalid(self): obj = dict(name='one_rule', source='any', destination='any', interface='lan', sched='acme') msg = 'Schedule acme does not exist' self.do_module_test(obj, failed=True, msg=msg) + + ################################## + # protocol any with ports + # + def test_rule_create_protocol_any_with_dst_port(self): + """ test creation of a rule with protocol any and destination port """ + obj = dict(name='one_rule', source='any', destination='any:443', interface='lan', protocol='any') + command = "create rule 'one_rule' on 'lan', source='any', destination='any:443'" + self.do_module_test(obj, command=command) + + def test_rule_create_protocol_icmp_with_dst_port(self): + """ test creation of a rule with protocol icmp and destination port should fail """ + obj = dict(name='one_rule', source='any', destination='any:443', interface='lan', protocol='icmp') + msg = "'one_rule' on 'lan': you can't use ports on protocols other than tcp, udp, tcp/udp or any" + self.do_module_test(obj, failed=True, msg=msg) + + ################################## + # pass rule ordering (insert before first block/reject) + # + def test_rule_create_pass_before_block(self): + """ test that a new pass rule is inserted before the first block rule on the same interface """ + obj = dict(name='new_pass_rule', source='any', destination='any', interface='lan') + command = "create rule 'new_pass_rule' on 'lan', source='any', destination='any'" + self.do_module_test(obj, command=command) + # Verify the new pass rule appears before block_all_lan in the XML + self.load_xml_result() + filter_elt = self.xml_result.find('filter') + rules = [] + for rule_elt in filter_elt.findall('rule'): + iface_elt = rule_elt.find('interface') + descr_elt = rule_elt.find('descr') + if iface_elt is not None and iface_elt.text == 'lan' and descr_elt is not None: + rules.append(descr_elt.text) + self.assertIn('new_pass_rule', rules) + self.assertIn('block_all_lan', rules) + pass_idx = rules.index('new_pass_rule') + block_idx = rules.index('block_all_lan') + self.assertLess(pass_idx, block_idx, "pass rule should be positioned before block rule") + + def test_rule_create_block_appends_to_end(self): + """ test that a new block rule appends to the end (after existing block rules) """ + obj = dict(name='new_block_rule', source='any', destination='any', interface='lan', action='block') + command = "create rule 'new_block_rule' on 'lan', source='any', destination='any', action='block'" + self.do_module_test(obj, command=command) + # Verify the new block rule appears after block_all_lan + self.load_xml_result() + filter_elt = self.xml_result.find('filter') + rules = [] + for rule_elt in filter_elt.findall('rule'): + iface_elt = rule_elt.find('interface') + descr_elt = rule_elt.find('descr') + if iface_elt is not None and iface_elt.text == 'lan' and descr_elt is not None: + rules.append(descr_elt.text) + self.assertIn('new_block_rule', rules) + self.assertIn('block_all_lan', rules) + new_idx = rules.index('new_block_rule') + existing_idx = rules.index('block_all_lan') + self.assertGreater(new_idx, existing_idx, "new block rule should append after existing block rule") + + def test_rule_create_pass_before_reject(self): + """ test that a new pass rule is inserted before a reject rule, not just block """ + # Fixture has both block_all_lan and reject_all_lan on lan. + # A new pass rule should be inserted before both. + obj = dict(name='new_pass_above_reject', source='any', destination='any:80', interface='lan', protocol='tcp') + command = "create rule 'new_pass_above_reject' on 'lan', source='any', destination='any:80', protocol='tcp'" + self.do_module_test(obj, command=command) + self.load_xml_result() + filter_elt = self.xml_result.find('filter') + rules = [] + for rule_elt in filter_elt.findall('rule'): + iface_elt = rule_elt.find('interface') + descr_elt = rule_elt.find('descr') + if iface_elt is not None and iface_elt.text == 'lan' and descr_elt is not None: + rules.append(descr_elt.text) + self.assertIn('new_pass_above_reject', rules) + self.assertIn('block_all_lan', rules) + self.assertIn('reject_all_lan', rules) + pass_idx = rules.index('new_pass_above_reject') + block_idx = rules.index('block_all_lan') + reject_idx = rules.index('reject_all_lan') + self.assertLess(pass_idx, block_idx, "pass rule should be before block rule") + self.assertLess(pass_idx, reject_idx, "pass rule should be before reject rule")