Files
unison/bindings/python/ns__init__.py

467 lines
19 KiB
Python
Raw Normal View History

import builtins
from copy import copy
from functools import lru_cache
import glob
import os.path
import sys
import sysconfig
import re
DEFAULT_INCLUDE_DIR = sysconfig.get_config_var("INCLUDEDIR")
DEFAULT_LIB_DIR = sysconfig.get_config_var("LIBDIR")
2022-08-03 21:13:21 -03:00
def find_ns3_lock():
# Get the absolute path to this file
path_to_this_init_file = os.path.dirname(os.path.abspath(__file__))
path_to_lock = path_to_this_init_file
lock_file = (".lock-ns3_%s_build" % sys.platform)
2022-08-03 21:13:21 -03:00
# Move upwards until we reach the directory with the ns3 script
prev_path = None
2022-08-03 21:13:21 -03:00
while "ns3" not in os.listdir(path_to_lock):
prev_path = path_to_lock
path_to_lock = os.path.dirname(path_to_lock)
if prev_path == path_to_lock:
break
2022-08-03 21:13:21 -03:00
# We should be now at the directory that contains a lock if the project is configured
if lock_file in os.listdir(path_to_lock):
path_to_lock += os.sep + lock_file
else:
path_to_lock = None
return path_to_lock
SYSTEM_LIBRARY_DIRECTORIES = (DEFAULT_LIB_DIR,
os.path.dirname(DEFAULT_LIB_DIR)
)
DYNAMIC_LIBRARY_EXTENSIONS = {"linux": "so",
"win32": "dll",
"darwin": "dylib"
}
LIBRARY_EXTENSION = DYNAMIC_LIBRARY_EXTENSIONS[sys.platform]
def trim_library_path(library_path: str) -> str:
trimmed_library_path = os.path.basename(library_path)
# Remove lib prefix if it exists and extensions
trimmed_library_path = trimmed_library_path.split(".")[0]
if trimmed_library_path[0:3] == "lib":
trimmed_library_path = trimmed_library_path[3:]
return trimmed_library_path
@lru_cache(maxsize=None)
def _search_libraries() -> dict:
# Otherwise, search for ns-3 libraries
# Should be the case when ns-3 is installed as a package
env_sep = ";" if sys.platform == "win32" else ":"
# Search in default directories PATH and LD_LIBRARY_PATH
library_search_paths = os.getenv("PATH", "").split(env_sep)
library_search_paths += os.getenv("LD_LIBRARY_PATH", "").split(env_sep)
if "" in library_search_paths:
library_search_paths.remove("")
del env_sep
# And the current working directory too
library_search_paths += [os.path.abspath(os.getcwd())]
# And finally the directories containing this file and its parent directory
library_search_paths += [os.path.abspath(os.path.dirname(__file__))]
library_search_paths += [os.path.dirname(library_search_paths[-1])]
library_search_paths += [os.path.dirname(library_search_paths[-1])]
# Search for the core library in the search paths
libraries = []
for search_path in library_search_paths:
libraries += glob.glob("%s/**/*.%s*" % (search_path, LIBRARY_EXTENSION), recursive=True)
# Search system library directories (too slow for recursive search)
for search_path in SYSTEM_LIBRARY_DIRECTORIES:
libraries += glob.glob("%s/**/*.%s*" % (search_path, LIBRARY_EXTENSION), recursive=False)
del search_path, library_search_paths
library_map = {}
# Organize libraries into a map
for library in libraries:
library_infix = trim_library_path(library)
# Then check if a key already exists
if library_infix not in library_map:
library_map[library_infix] = set()
# Append the directory
library_map[library_infix].add(library)
# Replace sets with lists
for (key, values) in library_map.items():
library_map[key] = list(values)
return library_map
def search_libraries(library_name: str) -> list:
libraries_map = _search_libraries()
trimmed_library_name = trim_library_path(library_name)
matched_names = list(filter(lambda x: trimmed_library_name in x, libraries_map.keys()))
matched_libraries = []
if matched_names:
for match in matched_names:
matched_libraries += libraries_map[match]
return matched_libraries
def extract_library_include_dirs(library_name: str, prefix: str) -> list:
library_path = "%s/lib/%s" % (prefix, library_name)
linked_libs = []
# First discover which 3rd-party libraries are used by the current module
try:
with open(os.path.abspath(library_path), "rb") as f:
linked_libs = re.findall(b"\x00(lib.*?.%b)" % LIBRARY_EXTENSION.encode("utf-8"), f.read())
except Exception as e:
print("Failed to extract libraries used by {library} with exception:{exception}"
.format(library=library_path, exception=e))
exit(-1)
linked_libs_include_dirs = set()
# Now find these libraries and add a few include paths for them
for linked_library in map(lambda x: x.decode("utf-8"), linked_libs):
# Skip ns-3 modules
if "libns3" in linked_library:
continue
# Search for the absolute path of the library
linked_library_path = search_libraries(linked_library)
# Raise error in case the library can't be found
if len(linked_library_path) == 0:
raise Exception(
"Failed to find {library}. Make sure its library directory is in LD_LIBRARY_PATH.".format(
library=linked_library))
# Get path with shortest length
linked_library_path = sorted(linked_library_path, key=lambda x: len(x))[0]
# If library is part of the ns-3 build, continue without any new includes
if prefix in linked_library_path:
continue
# If it is part of the system directories, try to find it
system_include_dir = os.path.dirname(linked_library_path).replace("lib", "include")
if os.path.exists(system_include_dir):
linked_libs_include_dirs.add(system_include_dir)
# If system_include_dir/library_name exists, we add it too
linked_library_name = linked_library.replace("lib", "").replace("." + LIBRARY_EXTENSION, "")
if os.path.exists(os.path.join(system_include_dir, linked_library_name)):
linked_libs_include_dirs.add(os.path.join(system_include_dir, linked_library_name))
# In case it isn't, include new include directories based on the path
def add_parent_dir_recursively(x: str, y: int) -> None:
if y <= 0:
return
parent_dir = os.path.dirname(x)
linked_libs_include_dirs.add(parent_dir)
add_parent_dir_recursively(parent_dir, y - 1)
add_parent_dir_recursively(linked_library_path, 2)
for lib_path in list(linked_libs_include_dirs):
inc_path = os.path.join(lib_path, "include")
if os.path.exists(inc_path):
linked_libs_include_dirs.add(inc_path)
return list(linked_libs_include_dirs)
2022-08-03 21:13:21 -03:00
def load_modules():
lock_file = find_ns3_lock()
libraries_to_load = []
if lock_file:
# Load NS3_ENABLED_MODULES from the lock file inside the build directory
values = {}
# If we find a lock file, load the ns-3 modules from it
# Should be the case when running from the source directory
exec(open(lock_file).read(), {}, values)
suffix = "-" + values["BUILD_PROFILE"] if values["BUILD_PROFILE"] != "release" else ""
modules = [module.replace("ns3-", "") for module in values["NS3_ENABLED_MODULES"]]
prefix = values["out_dir"]
libraries = {os.path.splitext(os.path.basename(x))[0]: x for x in os.listdir(os.path.join(prefix, "lib"))}
version = values["VERSION"]
# Filter out test libraries and incorrect versions
def filter_in_matching_ns3_libraries(libraries_to_filter: dict, modules_to_filter: list, version: str) -> dict:
for module in modules_to_filter:
for library in list(libraries_to_filter.keys()):
if module not in library:
continue
if "-".join([version, module]) not in library:
libraries_to_filter.pop(library)
elif "test" in library:
libraries_to_filter.pop(library)
return libraries_to_filter
libraries = filter_in_matching_ns3_libraries(libraries, modules, version)
else:
libraries = search_libraries("ns3")
if not libraries:
raise Exception("ns-3 libraries were not found.")
# The prefix is the directory with the lib directory
# libns3-dev-core.so/../../
prefix = os.path.dirname(os.path.dirname(libraries[0]))
# Remove test libraries
libraries = list(filter(lambda x: "test" not in x, libraries))
# Extract version and build suffix (if it exists)
def filter_module_name(library):
library = os.path.splitext(os.path.basename(library))[0]
components = library.split("-")
# Remove version-related prefixes
if "libns3" in components[0]:
components.pop(0)
if "dev" == components[0]:
components.pop(0)
if "rc" in components[0]:
components.pop(0)
# Drop build profile suffix and test libraries
if components[-1] in ["debug", "default", "optimized", "release", "relwithdebinfo"]:
components.pop(-1)
return "-".join(components)
# Filter out module names
modules = set([filter_module_name(library) for library in libraries])
def extract_version(library: str, module: str) -> str:
return os.path.basename(library).replace("libns", "").split(module)[0][:-1]
def get_newest_version(versions: list) -> str:
versions = list(sorted(versions))
if "dev" in versions[0]:
return versions[0]
# Check if there is a release of a possible candidate
try:
pos = versions.index(versions[-1].split('-')[0])
except ValueError:
pos = None
# Remove release candidates
if pos is not None:
return versions[pos]
else:
return versions[-1]
def filter_in_newest_ns3_libraries(libraries_to_filter: list, modules_to_filter: list) -> list:
# Filter out older ns-3 libraries
for module in list(modules_to_filter):
# Filter duplicates of modules, while excluding test libraries
conflicting_libraries = list(filter(lambda x: module in x, libraries_to_filter))
# Extract versions from conflicting libraries
conflicting_libraries_versions = list(map(lambda x: extract_version(x, module), conflicting_libraries))
newest_version = get_newest_version(conflicting_libraries_versions)
for conflicting_library in conflicting_libraries:
if "-".join([newest_version, module]) not in conflicting_library:
libraries.remove(conflicting_library)
return libraries_to_filter
# Get library base names
libraries = filter_in_newest_ns3_libraries(libraries, modules)
libraries_to_load = list(map(lambda x: os.path.basename(x), libraries))
# Try to import Cppyy and warn the user in case it is not found
try:
import cppyy
except ModuleNotFoundError:
print("Cppyy is required by the ns-3 python bindings.")
print("You can install it with the following command: pip install cppyy")
exit(-1)
# Enable full logs for debugging
# cppyy.set_debug(True)
# Register Ptr<> as a smart pointer
import libcppyy
libcppyy.AddSmartPtrType('Ptr')
# Import ns-3 libraries
prefix = os.path.abspath(prefix)
cppyy.add_library_path("%s/lib" % prefix)
cppyy.add_include_path("%s/include" % prefix)
if lock_file:
# When we have the lock file, we assemble the correct library names
for module in modules:
library_name = "libns{version}-{module}{suffix}".format(
version=version,
module=module,
suffix=suffix
)
if library_name not in libraries:
raise Exception("Missing library %s\n" % library_name,
"Build all modules with './ns3 build'"
)
libraries_to_load.append(libraries[library_name])
known_include_dirs = set()
# We then need to include all include directories for dependencies
for library in libraries_to_load:
for linked_lib_include_dir in extract_library_include_dirs(library, prefix):
if linked_lib_include_dir not in known_include_dirs:
known_include_dirs.add(linked_lib_include_dir)
cppyy.add_include_path(linked_lib_include_dir)
for module in modules:
cppyy.include("ns3/%s-module.h" % module)
# After including all headers, we finally load the modules
for library in libraries_to_load:
cppyy.load_library(library)
# We expose cppyy to consumers of this module as ns.cppyy
setattr(cppyy.gbl.ns3, "cppyy", cppyy)
# To maintain compatibility with pybindgen scripts,
# we set an attribute per module that just redirects to the upper object
for module in modules:
setattr(cppyy.gbl.ns3, module.replace("-", "_"), cppyy.gbl.ns3)
2022-08-03 21:13:21 -03:00
# Set up a few tricks
cppyy.cppdef("""
using namespace ns3;
bool Time_ge(Time& a, Time& b){ return a >= b;}
bool Time_eq(Time& a, Time& b){ return a == b;}
bool Time_ne(Time& a, Time& b){ return a != b;}
bool Time_le(Time& a, Time& b){ return a <= b;}
bool Time_gt(Time& a, Time& b){ return a > b;}
bool Time_lt(Time& a, Time& b){ return a < b;}
""")
cppyy.gbl.ns3.Time.__ge__ = cppyy.gbl.Time_ge
cppyy.gbl.ns3.Time.__eq__ = cppyy.gbl.Time_eq
cppyy.gbl.ns3.Time.__ne__ = cppyy.gbl.Time_ne
cppyy.gbl.ns3.Time.__le__ = cppyy.gbl.Time_le
cppyy.gbl.ns3.Time.__gt__ = cppyy.gbl.Time_gt
cppyy.gbl.ns3.Time.__lt__ = cppyy.gbl.Time_lt
# Node::~Node isn't supposed to destroy the object,
# since it gets destroyed at the end of the simulation
# we need to hold the reference until it gets destroyed by C++
#
# Search for NodeList::Add (this)
cppyy.gbl.ns3.__nodes_pending_deletion = []
2022-08-03 21:13:21 -03:00
def Node_del(self: cppyy.gbl.ns3.Node) -> None:
cppyy.gbl.ns3.__nodes_pending_deletion.append(self)
return None
2022-08-03 21:13:21 -03:00
cppyy.gbl.ns3.Node.__del__ = Node_del
# Define ns.cppyy.gbl.addressFromIpv4Address and others
cppyy.cppdef("""using namespace ns3;
Address addressFromIpv4Address(Ipv4Address ip){ return Address(ip); };
Address addressFromInetSocketAddress(InetSocketAddress addr){ return Address(addr); };
Address addressFromPacketSocketAddress(PacketSocketAddress addr){ return Address(addr); };
""")
# Expose addressFromIpv4Address as a member of the ns3 namespace (equivalent to ns)
setattr(cppyy.gbl.ns3, "addressFromIpv4Address", cppyy.gbl.addressFromIpv4Address)
setattr(cppyy.gbl.ns3, "addressFromInetSocketAddress", cppyy.gbl.addressFromInetSocketAddress)
setattr(cppyy.gbl.ns3, "addressFromPacketSocketAddress", cppyy.gbl.addressFromPacketSocketAddress)
2022-08-03 21:13:21 -03:00
cppyy.cppdef("""
using namespace ns3;
std::tuple<bool, TypeId> LookupByNameFailSafe(std::string name)
{
TypeId id;
bool ok = TypeId::LookupByNameFailSafe(name, &id);
return std::make_tuple(ok, id);
}
""")
setattr(cppyy.gbl.ns3, "LookupByNameFailSafe", cppyy.gbl.LookupByNameFailSafe)
2022-08-03 21:13:21 -03:00
def CreateObject(className):
try:
try:
func = "CreateObject%s" % re.sub('[<|>]', '_', className)
return getattr(cppyy.gbl, func)()
except AttributeError:
pass
try:
func = "Create%s" % re.sub('[<|>]', '_', className)
return getattr(cppyy.gbl, func)()
except AttributeError:
pass
raise AttributeError
except AttributeError:
try:
func = "CreateObject%s" % re.sub('[<|>]', '_', className)
cppyy.cppdef("""
using namespace ns3;
Ptr<%s> %s(){
Ptr<%s> object = CreateObject<%s>();
return object;
}
""" % (className, func, className, className)
)
except Exception as e:
try:
func = "Create%s" % re.sub('[<|>]', '_', className)
cppyy.cppdef("""
using namespace ns3;
%s %s(){
%s object = %s();
return object;
}
""" % (className, func, className, className)
)
except Exception as e:
exit(-1)
return getattr(cppyy.gbl, func)()
setattr(cppyy.gbl.ns3, "CreateObject", CreateObject)
def GetObject(parentObject, aggregatedObject):
# Objects have __cpp_name__ attributes, so parentObject
# should not have it while aggregatedObject can
if hasattr(parentObject, "__cpp_name__"):
raise Exception("Class was passed instead of an instance in parentObject")
aggregatedIsClass = hasattr(aggregatedObject, "__cpp_name__")
aggregatedIsString = type(aggregatedObject) == str
aggregatedIsInstance = not aggregatedIsClass and not aggregatedIsString
if aggregatedIsClass:
aggregatedType = aggregatedObject.__cpp_name__
if aggregatedIsInstance:
aggregatedType = aggregatedObject.__class__.__cpp_name__
if aggregatedIsString:
aggregatedType = aggregatedObject
cppyy.cppdef(
"""using namespace ns3; template <> Ptr<%s> getAggregatedObject<%s>(Ptr<Object> parentPtr, %s param)
{
return parentPtr->GetObject<%s>();
}
""" % (aggregatedType, aggregatedType, aggregatedType, aggregatedType)
)
return cppyy.gbl.getAggregatedObject(parentObject,
aggregatedObject if aggregatedIsClass else aggregatedObject.__class__)
setattr(cppyy.gbl.ns3, "GetObject", GetObject)
return cppyy.gbl.ns3
# Load all modules and make them available via a built-in
2022-08-03 21:13:21 -03:00
ns = load_modules() # can be imported via 'from ns import ns'
builtins.__dict__['ns'] = ns # or be made widely available with 'from ns import *'