You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1093 lines
42 KiB
1093 lines
42 KiB
#!/usr/bin/python
|
|
#
|
|
# Copyright 2017 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import
|
|
from errno import * # pylint: disable=wildcard-import
|
|
from socket import * # pylint: disable=wildcard-import
|
|
|
|
import random
|
|
import itertools
|
|
import struct
|
|
import unittest
|
|
|
|
from scapy import all as scapy
|
|
from tun_twister import TunTwister
|
|
import csocket
|
|
import iproute
|
|
import multinetwork_base
|
|
import net_test
|
|
import packets
|
|
import util
|
|
import xfrm
|
|
import xfrm_base
|
|
|
|
_LOOPBACK_IFINDEX = 1
|
|
_TEST_XFRM_IFNAME = "ipsec42"
|
|
_TEST_XFRM_IF_ID = 42
|
|
_TEST_SPI = 0x1234
|
|
|
|
# Does the kernel support xfrmi interfaces?
|
|
def HaveXfrmInterfaces():
|
|
if net_test.LINUX_VERSION >= (4, 19, 0):
|
|
return True
|
|
|
|
try:
|
|
i = iproute.IPRoute()
|
|
i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
|
|
_LOOPBACK_IFINDEX)
|
|
i.DeleteLink(_TEST_XFRM_IFNAME)
|
|
try:
|
|
i.GetIfIndex(_TEST_XFRM_IFNAME)
|
|
assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME
|
|
except IOError:
|
|
pass
|
|
return True
|
|
except IOError:
|
|
return False
|
|
|
|
HAVE_XFRM_INTERFACES = HaveXfrmInterfaces()
|
|
|
|
# Does the kernel support CONFIG_XFRM_MIGRATE?
|
|
def SupportsXfrmMigrate():
|
|
if net_test.LINUX_VERSION >= (5, 10, 0):
|
|
return True
|
|
|
|
# XFRM_MIGRATE depends on xfrmi interfaces
|
|
if not HAVE_XFRM_INTERFACES:
|
|
return False
|
|
|
|
try:
|
|
x = xfrm.Xfrm()
|
|
wildcard_addr = net_test.GetWildcardAddress(6)
|
|
selector = xfrm.EmptySelector(AF_INET6)
|
|
|
|
# Expect migration to fail with EINVAL because it is trying to migrate a
|
|
# non-existent SA.
|
|
x.MigrateTunnel(xfrm.XFRM_POLICY_OUT, selector, wildcard_addr, wildcard_addr,
|
|
wildcard_addr, wildcard_addr, _TEST_SPI,
|
|
None, None, None, None, None, None)
|
|
print("Migration succeeded unexpectedly, assuming XFRM_MIGRATE is enabled")
|
|
return True
|
|
except IOError as err:
|
|
if err.errno == ENOPROTOOPT:
|
|
return False
|
|
elif err.errno == EINVAL:
|
|
return True
|
|
else:
|
|
print("Unexpected error, assuming XFRM_MIGRATE is enabled:", err.errno)
|
|
return True
|
|
|
|
SUPPORTS_XFRM_MIGRATE = SupportsXfrmMigrate()
|
|
|
|
# Parameters to setup tunnels as special networks
|
|
_TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService
|
|
_BASE_TUNNEL_NETID = {4: 40, 6: 60}
|
|
_BASE_VTI_OKEY = 2000000100
|
|
_BASE_VTI_IKEY = 2000000200
|
|
|
|
_TEST_OUT_SPI = _TEST_SPI
|
|
_TEST_IN_SPI = _TEST_OUT_SPI
|
|
|
|
_TEST_OKEY = 2000000100
|
|
_TEST_IKEY = 2000000200
|
|
|
|
_TEST_REMOTE_PORT = 1234
|
|
|
|
_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6}
|
|
|
|
|
|
def _GetLocalInnerAddress(version):
|
|
return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version]
|
|
|
|
|
|
def _GetRemoteInnerAddress(version):
|
|
return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version]
|
|
|
|
|
|
def _GetRemoteOuterAddress(version):
|
|
return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
|
|
|
|
|
|
def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer,
|
|
src_port, dst_inner, dst_outer,
|
|
dst_port, spi, seq_num, ip_hdr_options=None):
|
|
if ip_hdr_options is None:
|
|
ip_hdr_options = {}
|
|
|
|
ip_hdr_options.update({'src': src_inner, 'dst': dst_inner})
|
|
|
|
# Build and receive an ESP packet destined for the inner socket
|
|
IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version]
|
|
input_pkt = (
|
|
IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) /
|
|
net_test.UDP_PAYLOAD)
|
|
input_pkt = IpType(str(input_pkt)) # Compute length, checksum.
|
|
input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num,
|
|
(src_outer, dst_outer))
|
|
|
|
return input_pkt
|
|
|
|
|
|
def _CreateReceiveSock(version, port=0):
|
|
# Create a socket to receive packets.
|
|
read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
|
|
read_sock.bind((net_test.GetWildcardAddress(version), port))
|
|
# The second parameter of the tuple is the port number regardless of AF.
|
|
local_port = read_sock.getsockname()[1]
|
|
# Guard against the eventuality of the receive failing.
|
|
csocket.SetSocketTimeout(read_sock, 500)
|
|
|
|
return read_sock, local_port
|
|
|
|
|
|
def _SendPacket(testInstance, netid, version, remote, remote_port):
|
|
# Send a packet out via the tunnel-backed network, bound for the port number
|
|
# of the input socket.
|
|
write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
|
|
testInstance.SelectInterface(write_sock, netid, "mark")
|
|
write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port))
|
|
local_port = write_sock.getsockname()[1]
|
|
|
|
return local_port
|
|
|
|
|
|
def InjectTests():
|
|
InjectParameterizedTests(XfrmTunnelTest)
|
|
InjectParameterizedTests(XfrmInterfaceTest)
|
|
InjectParameterizedTests(XfrmVtiTest)
|
|
InjectParameterizedTests(XfrmInterfaceMigrateTest)
|
|
|
|
|
|
def InjectParameterizedTests(cls):
|
|
VERSIONS = (4, 6)
|
|
param_list = itertools.product(VERSIONS, VERSIONS)
|
|
|
|
def NameGenerator(*args):
|
|
return "IPv%d_in_IPv%d" % tuple(args)
|
|
|
|
util.InjectParameterizedTest(cls, param_list, NameGenerator)
|
|
|
|
|
|
class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
|
|
|
|
def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid,
|
|
netid, local_inner, remote_inner, local_outer,
|
|
remote_outer, write_sock):
|
|
|
|
write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
|
|
self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
|
|
local_outer, remote_outer)
|
|
|
|
def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid,
|
|
netid, local_inner, remote_inner, local_outer,
|
|
remote_outer, read_sock):
|
|
|
|
# The second parameter of the tuple is the port number regardless of AF.
|
|
local_port = read_sock.getsockname()[1]
|
|
|
|
# Build and receive an ESP packet destined for the inner socket
|
|
input_pkt = _GetNullAuthCryptTunnelModePkt(
|
|
inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT,
|
|
local_inner, local_outer, local_port, _TEST_IN_SPI, 1)
|
|
self.ReceivePacketOn(underlying_netid, input_pkt)
|
|
|
|
# Verify that the packet data and src are correct
|
|
data, src = read_sock.recvfrom(4096)
|
|
self.assertEqual(net_test.UDP_PAYLOAD, data)
|
|
self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
|
|
|
|
def _TestTunnel(self, inner_version, outer_version, func, direction,
|
|
test_output_mark_unset):
|
|
"""Test a unidirectional XFRM Tunnel with explicit selectors"""
|
|
# Select the underlying netid, which represents the external
|
|
# interface from/to which to route ESP packets.
|
|
u_netid = self.RandomNetid()
|
|
# Select a random netid that will originate traffic locally and
|
|
# which represents the netid on which the plaintext is sent
|
|
netid = self.RandomNetid(exclude=u_netid)
|
|
|
|
local_inner = self.MyAddress(inner_version, netid)
|
|
remote_inner = _GetRemoteInnerAddress(inner_version)
|
|
local_outer = self.MyAddress(outer_version, u_netid)
|
|
remote_outer = _GetRemoteOuterAddress(outer_version)
|
|
|
|
output_mark = u_netid
|
|
if test_output_mark_unset:
|
|
output_mark = None
|
|
self.SetDefaultNetwork(u_netid)
|
|
|
|
try:
|
|
# Create input/ouput SPs, SAs and sockets to simulate a more realistic
|
|
# environment.
|
|
self.xfrm.CreateTunnel(
|
|
xfrm.XFRM_POLICY_IN, xfrm.SrcDstSelector(remote_inner, local_inner),
|
|
remote_outer, local_outer, _TEST_IN_SPI, xfrm_base._ALGO_CRYPT_NULL,
|
|
xfrm_base._ALGO_AUTH_NULL, None, None, None, xfrm.MATCH_METHOD_ALL)
|
|
|
|
self.xfrm.CreateTunnel(
|
|
xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner),
|
|
local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256,
|
|
xfrm_base._ALGO_HMAC_SHA1, None, output_mark, None, xfrm.MATCH_METHOD_ALL)
|
|
|
|
write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
|
|
self.SelectInterface(write_sock, netid, "mark")
|
|
read_sock, _ = _CreateReceiveSock(inner_version)
|
|
|
|
sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock
|
|
func(inner_version, outer_version, u_netid, netid, local_inner,
|
|
remote_inner, local_outer, remote_outer, sock)
|
|
finally:
|
|
if test_output_mark_unset:
|
|
self.ClearDefaultNetwork()
|
|
|
|
def ParamTestTunnelInput(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput,
|
|
xfrm.XFRM_POLICY_IN, False)
|
|
|
|
def ParamTestTunnelOutput(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
|
|
xfrm.XFRM_POLICY_OUT, False)
|
|
|
|
def ParamTestTunnelOutputNoSetMark(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
|
|
xfrm.XFRM_POLICY_OUT, True)
|
|
|
|
|
|
@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
|
|
class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
|
|
def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr,
|
|
ikey, okey):
|
|
self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey)
|
|
self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey)
|
|
|
|
family = AF_INET if version == 4 else AF_INET6
|
|
self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]),
|
|
local_addr)
|
|
self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]),
|
|
remote_addr)
|
|
|
|
def testAddVti(self):
|
|
"""Test the creation of a Virtual Tunnel Interface."""
|
|
for version in [4, 6]:
|
|
netid = self.RandomNetid()
|
|
local_addr = self.MyAddress(version, netid)
|
|
self.iproute.CreateVirtualTunnelInterface(
|
|
dev_name=_TEST_XFRM_IFNAME,
|
|
local_addr=local_addr,
|
|
remote_addr=_GetRemoteOuterAddress(version),
|
|
o_key=_TEST_OKEY,
|
|
i_key=_TEST_IKEY)
|
|
self._VerifyVtiInfoData(
|
|
self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
|
|
_GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY)
|
|
|
|
new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2}
|
|
new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID
|
|
new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID
|
|
self.iproute.CreateVirtualTunnelInterface(
|
|
dev_name=_TEST_XFRM_IFNAME,
|
|
local_addr=local_addr,
|
|
remote_addr=new_remote_addr[version],
|
|
o_key=new_okey,
|
|
i_key=new_ikey,
|
|
is_update=True)
|
|
|
|
self._VerifyVtiInfoData(
|
|
self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
|
|
new_remote_addr[version], new_ikey, new_okey)
|
|
|
|
if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
|
|
|
|
# Validate that the netlink interface matches the ioctl interface.
|
|
self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
|
|
self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
|
|
with self.assertRaises(IOError):
|
|
self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
|
|
|
|
def _QuietDeleteLink(self, ifname):
|
|
try:
|
|
self.iproute.DeleteLink(ifname)
|
|
except IOError:
|
|
# The link was not present.
|
|
pass
|
|
|
|
def tearDown(self):
|
|
super(XfrmAddDeleteVtiTest, self).tearDown()
|
|
self._QuietDeleteLink(_TEST_XFRM_IFNAME)
|
|
|
|
|
|
class SaInfo(object):
|
|
|
|
def __init__(self, spi):
|
|
self.spi = spi
|
|
self.seq_num = 1
|
|
|
|
|
|
class IpSecBaseInterface(object):
|
|
|
|
def __init__(self, iface, netid, underlying_netid, local, remote, version):
|
|
self.iface = iface
|
|
self.netid = netid
|
|
self.underlying_netid = underlying_netid
|
|
self.local, self.remote = local, remote
|
|
|
|
# XFRM interfaces technically do not have a version. This keeps track of
|
|
# the IP version of the local and remote addresses.
|
|
self.version = version
|
|
self.rx = self.tx = 0
|
|
self.addrs = {}
|
|
|
|
self.iproute = iproute.IPRoute()
|
|
self.xfrm = xfrm.Xfrm()
|
|
|
|
def Teardown(self):
|
|
self.TeardownXfrm()
|
|
self.TeardownInterface()
|
|
|
|
def TeardownInterface(self):
|
|
self.iproute.DeleteLink(self.iface)
|
|
|
|
def SetupXfrm(self, use_null_crypt):
|
|
rand_spi = random.randint(0, 0x7fffffff)
|
|
self.in_sa = SaInfo(rand_spi)
|
|
self.out_sa = SaInfo(rand_spi)
|
|
|
|
# Select algorithms:
|
|
if use_null_crypt:
|
|
auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL
|
|
else:
|
|
auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256
|
|
|
|
self.auth = auth
|
|
self.crypt = crypt
|
|
|
|
self._SetupXfrmByType(auth, crypt)
|
|
|
|
def Rekey(self, outer_family, new_out_sa, new_in_sa):
|
|
"""Rekeys the Tunnel Interface
|
|
|
|
Creates new SAs and updates the outbound security policy to use new SAs.
|
|
|
|
Args:
|
|
outer_family: AF_INET or AF_INET6
|
|
new_out_sa: An SaInfo struct representing the new outbound SA's info
|
|
new_in_sa: An SaInfo struct representing the new inbound SA's info
|
|
"""
|
|
self._Rekey(outer_family, new_out_sa, new_in_sa)
|
|
|
|
# Update Interface object
|
|
self.out_sa = new_out_sa
|
|
self.in_sa = new_in_sa
|
|
|
|
def TeardownXfrm(self):
|
|
raise NotImplementedError("Subclasses should implement this")
|
|
|
|
def _SetupXfrmByType(self, auth_algo, crypt_algo):
|
|
raise NotImplementedError("Subclasses should implement this")
|
|
|
|
def _Rekey(self, outer_family, new_out_sa, new_in_sa):
|
|
raise NotImplementedError("Subclasses should implement this")
|
|
|
|
|
|
class VtiInterface(IpSecBaseInterface):
|
|
|
|
def __init__(self, iface, netid, underlying_netid, _, local, remote, version):
|
|
super(VtiInterface, self).__init__(iface, netid, underlying_netid, local,
|
|
remote, version)
|
|
|
|
self.ikey = _TEST_IKEY + netid
|
|
self.okey = _TEST_OKEY + netid
|
|
|
|
self.SetupInterface()
|
|
self.SetupXfrm(False)
|
|
|
|
def SetupInterface(self):
|
|
return self.iproute.CreateVirtualTunnelInterface(
|
|
self.iface, self.local, self.remote, self.ikey, self.okey)
|
|
|
|
def _SetupXfrmByType(self, auth_algo, crypt_algo):
|
|
# For the VTI, the selectors are wildcard since packets will only
|
|
# be selected if they have the appropriate mark, hence the inner
|
|
# addresses are wildcard.
|
|
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
|
|
self.out_sa.spi, crypt_algo, auth_algo,
|
|
xfrm.ExactMatchMark(self.okey),
|
|
self.underlying_netid, None, xfrm.MATCH_METHOD_ALL)
|
|
|
|
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
|
|
self.in_sa.spi, crypt_algo, auth_algo,
|
|
xfrm.ExactMatchMark(self.ikey), None, None,
|
|
xfrm.MATCH_METHOD_MARK)
|
|
|
|
def TeardownXfrm(self):
|
|
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
|
|
self.out_sa.spi, self.okey, None)
|
|
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
|
|
self.in_sa.spi, self.ikey, None)
|
|
|
|
def _Rekey(self, outer_family, new_out_sa, new_in_sa):
|
|
# TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly
|
|
# the same, but rekeys are asymmetric, and only update the outbound
|
|
# policy.
|
|
self.xfrm.AddSaInfo(self.local, self.remote, new_out_sa.spi,
|
|
xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL,
|
|
xfrm_base._ALGO_AUTH_NULL, None, None,
|
|
xfrm.ExactMatchMark(self.okey), self.underlying_netid)
|
|
|
|
self.xfrm.AddSaInfo(self.remote, self.local, new_in_sa.spi,
|
|
xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL,
|
|
xfrm_base._ALGO_AUTH_NULL, None, None,
|
|
xfrm.ExactMatchMark(self.ikey), None)
|
|
|
|
# Create new policies for IPv4 and IPv6.
|
|
for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]:
|
|
# Add SPI-specific output policy to enforce using new outbound SPI
|
|
policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
|
|
tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0,
|
|
(self.local, self.remote))
|
|
self.xfrm.UpdatePolicyInfo(policy, tmpl, xfrm.ExactMatchMark(self.okey),
|
|
0)
|
|
|
|
def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi):
|
|
self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP,
|
|
xfrm.ExactMatchMark(self.ikey))
|
|
self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP,
|
|
xfrm.ExactMatchMark(self.okey))
|
|
|
|
|
|
@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
|
|
class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest):
|
|
"""Test the creation of an XFRM Interface."""
|
|
|
|
def testAddXfrmInterface(self):
|
|
self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
|
|
_LOOPBACK_IFINDEX)
|
|
if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
|
|
net_test.SetInterfaceUp(_TEST_XFRM_IFNAME)
|
|
|
|
# Validate that the netlink interface matches the ioctl interface.
|
|
self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
|
|
self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
|
|
with self.assertRaises(IOError):
|
|
self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
|
|
|
|
|
|
class XfrmInterface(IpSecBaseInterface):
|
|
|
|
def __init__(self, iface, netid, underlying_netid, ifindex, local, remote,
|
|
version, use_null_crypt=False):
|
|
super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local,
|
|
remote, version)
|
|
|
|
self.ifindex = ifindex
|
|
self.xfrm_if_id = netid
|
|
|
|
self.SetupInterface()
|
|
self.SetupXfrm(use_null_crypt)
|
|
|
|
def SetupInterface(self):
|
|
"""Create an XFRM interface."""
|
|
return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex)
|
|
|
|
def _SetupXfrmByType(self, auth_algo, crypt_algo):
|
|
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
|
|
self.out_sa.spi, crypt_algo, auth_algo, None,
|
|
self.underlying_netid, self.xfrm_if_id,
|
|
xfrm.MATCH_METHOD_ALL)
|
|
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
|
|
self.in_sa.spi, crypt_algo, auth_algo, None, None,
|
|
self.xfrm_if_id, xfrm.MATCH_METHOD_IFID)
|
|
|
|
def TeardownXfrm(self):
|
|
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
|
|
self.out_sa.spi, None, self.xfrm_if_id)
|
|
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
|
|
self.in_sa.spi, None, self.xfrm_if_id)
|
|
|
|
def _Rekey(self, outer_family, new_out_sa, new_in_sa):
|
|
# TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly
|
|
# the same, but rekeys are asymmetric, and only update the outbound
|
|
# policy.
|
|
self.xfrm.AddSaInfo(
|
|
self.local, self.remote, new_out_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0,
|
|
xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None,
|
|
None, self.underlying_netid, xfrm_if_id=self.xfrm_if_id)
|
|
|
|
self.xfrm.AddSaInfo(
|
|
self.remote, self.local, new_in_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0,
|
|
xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None,
|
|
None, None, xfrm_if_id=self.xfrm_if_id)
|
|
|
|
# Create new policies for IPv4 and IPv6.
|
|
for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]:
|
|
# Add SPI-specific output policy to enforce using new outbound SPI
|
|
policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
|
|
tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0,
|
|
(self.local, self.remote))
|
|
self.xfrm.UpdatePolicyInfo(policy, tmpl, None, self.xfrm_if_id)
|
|
|
|
def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi):
|
|
self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, None,
|
|
self.xfrm_if_id)
|
|
self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, None,
|
|
self.xfrm_if_id)
|
|
|
|
def Migrate(self, new_underlying_netid, new_local, new_remote):
|
|
self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
|
|
new_remote, new_local, self.in_sa.spi,
|
|
self.crypt, self.auth, None, None,
|
|
new_underlying_netid, self.xfrm_if_id)
|
|
|
|
self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
|
|
new_local, new_remote, self.out_sa.spi,
|
|
self.crypt, self.auth, None, None,
|
|
new_underlying_netid, self.xfrm_if_id)
|
|
|
|
self.local = new_local
|
|
self.remote = new_remote
|
|
self.underlying_netid = new_underlying_netid
|
|
|
|
|
|
class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
|
|
|
|
# Subclass that does not allow multiple tunnels (e.g. XfrmInterfaceMigrateTest)
|
|
# should override this method.
|
|
@classmethod
|
|
def allowMultipleTunnels(cls):
|
|
return True
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
xfrm_base.XfrmBaseTest.setUpClass()
|
|
# Tunnel interfaces use marks extensively, so configure realistic packet
|
|
# marking rules to make the test representative, make PMTUD work, etc.
|
|
cls.SetInboundMarks(True)
|
|
cls.SetMarkReflectSysctls(1)
|
|
|
|
# Group by tunnel version to ensure that we test at least one IPv4 and one
|
|
# IPv6 tunnel
|
|
cls.tunnelsV4 = {}
|
|
cls.tunnelsV6 = {}
|
|
|
|
if not cls.allowMultipleTunnels():
|
|
return
|
|
|
|
for i, underlying_netid in enumerate(cls.tuns):
|
|
for version in 4, 6:
|
|
netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i
|
|
iface = "ipsec%s" % netid
|
|
local = cls.MyAddress(version, underlying_netid)
|
|
if version == 4:
|
|
remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2)
|
|
else:
|
|
remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2)
|
|
|
|
ifindex = cls.ifindices[underlying_netid]
|
|
tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex,
|
|
local, remote, version)
|
|
cls._SetInboundMarking(netid, iface, True)
|
|
cls._SetupTunnelNetwork(tunnel, True)
|
|
|
|
if version == 4:
|
|
cls.tunnelsV4[netid] = tunnel
|
|
else:
|
|
cls.tunnelsV6[netid] = tunnel
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
# The sysctls are restored by MultinetworkBaseTest.tearDownClass.
|
|
cls.SetInboundMarks(False)
|
|
for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()):
|
|
cls._SetInboundMarking(tunnel.netid, tunnel.iface, False)
|
|
cls._SetupTunnelNetwork(tunnel, False)
|
|
tunnel.Teardown()
|
|
xfrm_base.XfrmBaseTest.tearDownClass()
|
|
|
|
def randomTunnel(self, outer_version):
|
|
version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6
|
|
return random.choice(list(version_dict.values()))
|
|
|
|
def setUp(self):
|
|
multinetwork_base.MultiNetworkBaseTest.setUp(self)
|
|
self.iproute = iproute.IPRoute()
|
|
self.xfrm = xfrm.Xfrm()
|
|
|
|
def tearDown(self):
|
|
multinetwork_base.MultiNetworkBaseTest.tearDown(self)
|
|
|
|
def _SwapInterfaceAddress(self, ifname, old_addr, new_addr):
|
|
"""Exchange two addresses on a given interface.
|
|
|
|
Args:
|
|
ifname: Name of the interface
|
|
old_addr: An address to be removed from the interface
|
|
new_addr: An address to be added to an interface
|
|
"""
|
|
version = 6 if ":" in new_addr else 4
|
|
ifindex = net_test.GetInterfaceIndex(ifname)
|
|
self.iproute.AddAddress(new_addr,
|
|
net_test.AddressLengthBits(version), ifindex)
|
|
self.iproute.DelAddress(old_addr,
|
|
net_test.AddressLengthBits(version), ifindex)
|
|
|
|
@classmethod
|
|
def _GetLocalAddress(cls, version, netid):
|
|
if version == 4:
|
|
return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET)
|
|
else:
|
|
return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1"
|
|
|
|
@classmethod
|
|
def _SetupTunnelNetwork(cls, tunnel, is_add):
|
|
"""Setup rules and routes for a tunnel Network.
|
|
|
|
Takes an interface and depending on the boolean
|
|
value of is_add, either adds or removes the rules
|
|
and routes for a tunnel interface to behave like an
|
|
Android Network for purposes of testing.
|
|
|
|
Args:
|
|
tunnel: A VtiInterface or XfrmInterface, the tunnel to set up.
|
|
is_add: Boolean that causes this method to perform setup if True or
|
|
teardown if False
|
|
"""
|
|
if is_add:
|
|
# Disable router solicitations to avoid occasional spurious packets
|
|
# arriving on the underlying network; there are two possible behaviors
|
|
# when that occurred: either only the RA packet is read, and when it
|
|
# is echoed back to the tunnel, it causes the test to fail by not
|
|
# receiving # the UDP_PAYLOAD; or, two packets may arrive on the
|
|
# underlying # network which fails the assertion that only one ESP packet
|
|
# is received.
|
|
cls.SetSysctl(
|
|
"/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0)
|
|
net_test.SetInterfaceUp(tunnel.iface)
|
|
|
|
for version in [4, 6]:
|
|
ifindex = net_test.GetInterfaceIndex(tunnel.iface)
|
|
table = tunnel.netid
|
|
|
|
# Set up routing rules.
|
|
start, end = cls.UidRangeForNetid(tunnel.netid)
|
|
cls.iproute.UidRangeRule(version, is_add, start, end, table,
|
|
cls.PRIORITY_UID)
|
|
cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF)
|
|
cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK,
|
|
table, cls.PRIORITY_FWMARK)
|
|
|
|
# Configure IP addresses.
|
|
addr = cls._GetLocalAddress(version, tunnel.netid)
|
|
prefixlen = net_test.AddressLengthBits(version)
|
|
tunnel.addrs[version] = addr
|
|
if is_add:
|
|
cls.iproute.AddAddress(addr, prefixlen, ifindex)
|
|
cls.iproute.AddRoute(version, table, "default", 0, None, ifindex)
|
|
else:
|
|
cls.iproute.DelRoute(version, table, "default", 0, None, ifindex)
|
|
cls.iproute.DelAddress(addr, prefixlen, ifindex)
|
|
|
|
def assertReceivedPacket(self, tunnel, sa_info):
|
|
tunnel.rx += 1
|
|
self.assertEqual((tunnel.rx, tunnel.tx),
|
|
self.iproute.GetRxTxPackets(tunnel.iface))
|
|
sa_info.seq_num += 1
|
|
|
|
def assertSentPacket(self, tunnel, sa_info):
|
|
tunnel.tx += 1
|
|
self.assertEqual((tunnel.rx, tunnel.tx),
|
|
self.iproute.GetRxTxPackets(tunnel.iface))
|
|
sa_info.seq_num += 1
|
|
|
|
def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner,
|
|
sa_info=None, expect_fail=False):
|
|
"""Test null-crypt input path over an IPsec interface."""
|
|
if sa_info is None:
|
|
sa_info = tunnel.in_sa
|
|
read_sock, local_port = _CreateReceiveSock(inner_version)
|
|
|
|
input_pkt = _GetNullAuthCryptTunnelModePkt(
|
|
inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT,
|
|
local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num)
|
|
self.ReceivePacketOn(tunnel.underlying_netid, input_pkt)
|
|
|
|
if expect_fail:
|
|
self.assertRaisesErrno(EAGAIN, read_sock.recv, 4096)
|
|
else:
|
|
# Verify that the packet data and src are correct
|
|
data, src = read_sock.recvfrom(4096)
|
|
self.assertReceivedPacket(tunnel, sa_info)
|
|
self.assertEqual(net_test.UDP_PAYLOAD, data)
|
|
self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
|
|
|
|
def _CheckTunnelOutput(self, tunnel, inner_version, local_inner,
|
|
remote_inner, sa_info=None):
|
|
"""Test null-crypt output path over an IPsec interface."""
|
|
if sa_info is None:
|
|
sa_info = tunnel.out_sa
|
|
local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
|
|
_TEST_REMOTE_PORT)
|
|
|
|
# Read a tunneled IP packet on the underlying (outbound) network
|
|
# verifying that it is an ESP packet.
|
|
pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
|
|
sa_info.seq_num, None, tunnel.local,
|
|
tunnel.remote)
|
|
|
|
# Get and update the IP headers on the inner payload so that we can do a simple
|
|
# comparison of byte data. Unfortunately, due to the scapy version this runs on,
|
|
# we cannot parse past the ESP header to the inner IP header, and thus have to
|
|
# workaround in this manner
|
|
if inner_version == 4:
|
|
ip_hdr_options = {
|
|
'id': scapy.IP(str(pkt.payload)[8:]).id,
|
|
'flags': scapy.IP(str(pkt.payload)[8:]).flags
|
|
}
|
|
else:
|
|
ip_hdr_options = {'fl': scapy.IPv6(str(pkt.payload)[8:]).fl}
|
|
|
|
expected = _GetNullAuthCryptTunnelModePkt(
|
|
inner_version, local_inner, tunnel.local, local_port, remote_inner,
|
|
tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num,
|
|
ip_hdr_options)
|
|
|
|
# Check outer header manually (Avoids having to overwrite outer header's
|
|
# id, flags or flow label)
|
|
self.assertSentPacket(tunnel, sa_info)
|
|
self.assertEqual(expected.src, pkt.src)
|
|
self.assertEqual(expected.dst, pkt.dst)
|
|
self.assertEqual(len(expected), len(pkt))
|
|
|
|
# Check everything else
|
|
self.assertEqual(str(expected.payload), str(pkt.payload))
|
|
|
|
def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner,
|
|
remote_inner):
|
|
"""Test both input and output paths over an encrypted IPsec interface.
|
|
|
|
This tests specifically makes sure that the both encryption and decryption
|
|
work together, as opposed to the _CheckTunnel(Input|Output) where the
|
|
input and output paths are tested separately, and using null encryption.
|
|
"""
|
|
src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
|
|
_TEST_REMOTE_PORT)
|
|
|
|
# Make sure it appeared on the underlying interface
|
|
pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi,
|
|
tunnel.out_sa.seq_num, None, tunnel.local,
|
|
tunnel.remote)
|
|
|
|
# Check that packet is not sent in plaintext
|
|
self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt))
|
|
|
|
# Check src/dst
|
|
self.assertEqual(tunnel.local, pkt.src)
|
|
self.assertEqual(tunnel.remote, pkt.dst)
|
|
|
|
# Check that the interface statistics recorded the outbound packet
|
|
self.assertSentPacket(tunnel, tunnel.out_sa)
|
|
|
|
try:
|
|
# Swap the interface addresses to pretend we are the remote
|
|
self._SwapInterfaceAddress(
|
|
tunnel.iface, new_addr=remote_inner, old_addr=local_inner)
|
|
|
|
# Swap the packet's IP headers and write it back to the underlying
|
|
# network.
|
|
pkt = TunTwister.TwistPacket(pkt)
|
|
read_sock, local_port = _CreateReceiveSock(inner_version,
|
|
_TEST_REMOTE_PORT)
|
|
self.ReceivePacketOn(tunnel.underlying_netid, pkt)
|
|
|
|
# Verify that the packet data and src are correct
|
|
data, src = read_sock.recvfrom(4096)
|
|
self.assertEqual(net_test.UDP_PAYLOAD, data)
|
|
self.assertEqual((local_inner, src_port), src[:2])
|
|
|
|
# Check that the interface statistics recorded the inbound packet
|
|
self.assertReceivedPacket(tunnel, tunnel.in_sa)
|
|
finally:
|
|
# Swap the interface addresses to pretend we are the remote
|
|
self._SwapInterfaceAddress(
|
|
tunnel.iface, new_addr=local_inner, old_addr=remote_inner)
|
|
|
|
def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner,
|
|
sa_info=None):
|
|
"""Test ICMP error path over an IPsec interface."""
|
|
if sa_info is None:
|
|
sa_info = tunnel.out_sa
|
|
# Now attempt to provoke an ICMP error.
|
|
# TODO: deduplicate with multinetwork_test.py.
|
|
dst_prefix, intermediate = {
|
|
4: ("172.19.", "172.16.9.12"),
|
|
6: ("2001:db8::", "2001:db8::1")
|
|
}[tunnel.version]
|
|
|
|
local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
|
|
_TEST_REMOTE_PORT)
|
|
pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
|
|
sa_info.seq_num, None, tunnel.local,
|
|
tunnel.remote)
|
|
self.assertSentPacket(tunnel, sa_info)
|
|
|
|
myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid)
|
|
_, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr,
|
|
pkt)
|
|
self.ReceivePacketOn(tunnel.underlying_netid, toobig)
|
|
|
|
# Check that the packet too big reduced the MTU.
|
|
routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None)
|
|
self.assertEqual(1, len(routes))
|
|
rtmsg, attributes = routes[0]
|
|
self.assertEqual(iproute.RTN_UNICAST, rtmsg.type)
|
|
self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
|
|
|
|
# Clear PMTU information so that future tests don't have to worry about it.
|
|
self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid)
|
|
|
|
def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner,
|
|
remote_inner):
|
|
"""Test combined encryption path with ICMP errors over an IPsec tunnel"""
|
|
self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
|
|
remote_inner)
|
|
self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner)
|
|
self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
|
|
remote_inner)
|
|
|
|
def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt):
|
|
"""Bootstrap method to setup and run tests for the given parameters."""
|
|
tunnel = self.randomTunnel(outer_version)
|
|
|
|
try:
|
|
# Some tests require that the out_seq_num and in_seq_num are the same
|
|
# (Specifically encrypted tests), rebuild SAs to ensure seq_num is 1
|
|
#
|
|
# Until we get better scapy support, the only way we can build an
|
|
# encrypted packet is to send it out, and read the packet from the wire.
|
|
# We then generally use this as the "inbound" encrypted packet, injecting
|
|
# it into the interface for which it is expected on.
|
|
#
|
|
# As such, this is required to ensure that encrypted packets (which we
|
|
# currently have no way to easily modify) are not considered replay
|
|
# attacks by the inbound SA. (eg: received 3 packets, seq_num_in = 3,
|
|
# sent only 1, # seq_num_out = 1, inbound SA would consider this a replay
|
|
# attack)
|
|
tunnel.TeardownXfrm()
|
|
tunnel.SetupXfrm(use_null_crypt)
|
|
|
|
local_inner = tunnel.addrs[inner_version]
|
|
remote_inner = _GetRemoteInnerAddress(inner_version)
|
|
|
|
for i in range(2):
|
|
func(tunnel, inner_version, local_inner, remote_inner)
|
|
finally:
|
|
if use_null_crypt:
|
|
tunnel.TeardownXfrm()
|
|
tunnel.SetupXfrm(False)
|
|
|
|
def _CheckTunnelRekey(self, tunnel, inner_version, local_inner, remote_inner):
|
|
old_out_sa = tunnel.out_sa
|
|
old_in_sa = tunnel.in_sa
|
|
|
|
# Check to make sure that both directions work before rekey
|
|
self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
|
|
old_in_sa)
|
|
self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
|
|
old_out_sa)
|
|
|
|
# Rekey
|
|
outer_family = net_test.GetAddressFamily(tunnel.version)
|
|
|
|
# Create new SA
|
|
# Distinguish the new SAs with new SPIs.
|
|
new_out_sa = SaInfo(old_out_sa.spi + 1)
|
|
new_in_sa = SaInfo(old_in_sa.spi + 1)
|
|
|
|
# Perform Rekey
|
|
tunnel.Rekey(outer_family, new_out_sa, new_in_sa)
|
|
|
|
# Expect that the old SPI still works for inbound packets
|
|
self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
|
|
old_in_sa)
|
|
|
|
# Test both paths with new SPIs, expect outbound to use new SPI
|
|
self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
|
|
new_in_sa)
|
|
self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
|
|
new_out_sa)
|
|
|
|
# Delete old SAs
|
|
tunnel.DeleteOldSaInfo(outer_family, old_in_sa.spi, old_out_sa.spi)
|
|
|
|
# Test both paths with new SPIs; should still work
|
|
self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
|
|
new_in_sa)
|
|
self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner,
|
|
new_out_sa)
|
|
|
|
# Expect failure upon trying to receive a packet with the deleted SPI
|
|
self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner,
|
|
old_in_sa, True)
|
|
|
|
def _TestTunnelRekey(self, inner_version, outer_version):
|
|
"""Test packet input and output over a Virtual Tunnel Interface."""
|
|
tunnel = self.randomTunnel(outer_version)
|
|
|
|
try:
|
|
# Always use null_crypt, so we can check input and output separately
|
|
tunnel.TeardownXfrm()
|
|
tunnel.SetupXfrm(True)
|
|
|
|
local_inner = tunnel.addrs[inner_version]
|
|
remote_inner = _GetRemoteInnerAddress(inner_version)
|
|
|
|
self._CheckTunnelRekey(tunnel, inner_version, local_inner, remote_inner)
|
|
finally:
|
|
tunnel.TeardownXfrm()
|
|
tunnel.SetupXfrm(False)
|
|
|
|
|
|
@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
|
|
class XfrmVtiTest(XfrmTunnelBase):
|
|
|
|
INTERFACE_CLASS = VtiInterface
|
|
|
|
def ParamTestVtiInput(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
|
|
|
|
def ParamTestVtiOutput(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
|
|
True)
|
|
|
|
def ParamTestVtiInOutEncrypted(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
|
|
False)
|
|
|
|
def ParamTestVtiIcmp(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
|
|
|
|
def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version,
|
|
self._CheckTunnelEncryptionWithIcmp, False)
|
|
|
|
def ParamTestVtiRekey(self, inner_version, outer_version):
|
|
self._TestTunnelRekey(inner_version, outer_version)
|
|
|
|
|
|
@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
|
|
class XfrmInterfaceTest(XfrmTunnelBase):
|
|
|
|
INTERFACE_CLASS = XfrmInterface
|
|
|
|
def ParamTestXfrmIntfInput(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
|
|
|
|
def ParamTestXfrmIntfOutput(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
|
|
True)
|
|
|
|
def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
|
|
False)
|
|
|
|
def ParamTestXfrmIntfIcmp(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
|
|
|
|
def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version,
|
|
self._CheckTunnelEncryptionWithIcmp, False)
|
|
|
|
def ParamTestXfrmIntfRekey(self, inner_version, outer_version):
|
|
self._TestTunnelRekey(inner_version, outer_version)
|
|
|
|
@unittest.skipUnless(SUPPORTS_XFRM_MIGRATE, "XFRM migration unsupported")
|
|
class XfrmInterfaceMigrateTest(XfrmTunnelBase):
|
|
# TODO: b/172497215 There is a kernel issue that XFRM_MIGRATE cannot work correctly
|
|
# when there are multiple tunnels with the same selectors. Thus before this issue
|
|
# is fixed, #allowMultipleTunnels must be overridden to avoid setting up multiple
|
|
# tunnels. This need to be removed after the kernel issue is fixed.
|
|
@classmethod
|
|
def allowMultipleTunnels(cls):
|
|
return False
|
|
|
|
def setUpTunnel(self, outer_version, use_null_crypt):
|
|
underlying_netid = self.RandomNetid()
|
|
netid = _BASE_TUNNEL_NETID[outer_version] + _TUNNEL_NETID_OFFSET
|
|
iface = "ipsec%s" % netid
|
|
ifindex = self.ifindices[underlying_netid]
|
|
|
|
local = self.MyAddress(outer_version, underlying_netid)
|
|
remote = net_test.IPV4_ADDR if outer_version == 4 else net_test.IPV6_ADDR
|
|
|
|
tunnel = XfrmInterface(iface, netid, underlying_netid, ifindex,
|
|
local, remote, outer_version, use_null_crypt)
|
|
self._SetInboundMarking(netid, iface, True)
|
|
self._SetupTunnelNetwork(tunnel, True)
|
|
|
|
return tunnel
|
|
|
|
def tearDownTunnel(self, tunnel):
|
|
self._SetInboundMarking(tunnel.netid, tunnel.iface, False)
|
|
self._SetupTunnelNetwork(tunnel, False)
|
|
tunnel.Teardown()
|
|
|
|
def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt):
|
|
try:
|
|
tunnel = self.setUpTunnel(outer_version, use_null_crypt)
|
|
|
|
# Verify functionality before migration
|
|
local_inner = tunnel.addrs[inner_version]
|
|
remote_inner = _GetRemoteInnerAddress(inner_version)
|
|
func(tunnel, inner_version, local_inner, remote_inner)
|
|
|
|
# Migrate tunnel
|
|
# TODO:b/169170981 Add tests that migrate 4 -> 6 and 6 -> 4
|
|
new_underlying_netid = self.RandomNetid(exclude=tunnel.underlying_netid)
|
|
new_local = self.MyAddress(outer_version, new_underlying_netid)
|
|
new_remote = net_test.IPV4_ADDR2 if outer_version == 4 else net_test.IPV6_ADDR2
|
|
|
|
tunnel.Migrate(new_underlying_netid, new_local, new_remote)
|
|
|
|
# Verify functionality after migration
|
|
func(tunnel, inner_version, local_inner, remote_inner)
|
|
finally:
|
|
self.tearDownTunnel(tunnel)
|
|
|
|
def ParamTestMigrateXfrmIntfInput(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
|
|
|
|
def ParamTestMigrateXfrmIntfOutput(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
|
|
True)
|
|
|
|
def ParamTestMigrateXfrmIntfInOutEncrypted(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
|
|
False)
|
|
|
|
def ParamTestMigrateXfrmIntfIcmp(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
|
|
|
|
def ParamTestMigrateXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version,
|
|
self._CheckTunnelEncryptionWithIcmp, False)
|
|
|
|
def ParamTestMigrateXfrmIntfRekey(self, inner_version, outer_version):
|
|
self._TestTunnel(inner_version, outer_version, self._CheckTunnelRekey,
|
|
True)
|
|
|
|
if __name__ == "__main__":
|
|
InjectTests()
|
|
unittest.main()
|