作者归档:softsim

Android在高版本中如何允许访问非 SDK 接口

https://developer.android.com/guide/app-compatibility/restrictions-non-sdk-interfaces?hl=zh-cn

Android 10(API 级别 29)或更高版本

如需允许访问非 SDK 接口,请使用以下 adb 命令:

 adb shell settings put global hidden_api_policy 1

如需将 API 强制执行策略重置为默认设置,请使用以下命令:

adb shell settings delete global hidden_api_policy

可以将 API 强制执行策略中的整数设置为以下值之一:

0:停用所有非 SDK 接口检测。如果您使用此设置,系统会停止输出针对非 SDK 接口使用行为的所有日志消息,并阻止您使用 StrictMode API 测试应用。建议不要使用此设置。
1:允许访问所有非 SDK 接口,但同时输出日志消息,并且在其中显示针对所有非 SDK 接口使用情况的警告。如果使用此设置,您仍可以使用 StrictMode API 来测试应用。
2:禁止使用列于屏蔽名单中的非 SDK 接口,或针对您的目标 API 级别被有条件屏蔽的非 SDK 接口

在系统映像中的什么位置可以找到非 SDK API 名单?
它们是在平台 dex 文件中的字段和方法访问标记位中编码的。 系统镜像中没有单独的文件包含这些名单

搭载同一 Android 版本的不同 OEM 设备上的非 SDK API 名单是否相同?
原始设备制造商 (OEM) 可以将自己的接口添加到屏蔽名单(黑名单)中,但是无法从 AOSP 非 SDK API 名单中移除接口。CDD 会阻止此类更改,并且 CTS 测试会确保 Android 运行时强制执行相应名单。

针对非 SDK 接口的限制是否适用于包括系统应用和第一方应用在内的所有应用,而不仅仅是第三方应用?

是,但我们对使用平台密钥进行签名的应用和某些系统映像应用免除这项限制。请注意,免除限制的情况仅适用于系统映像应用(或更新后的系统映像应用)。软件包白名单仅适用于针对私有平台 API(而不是 SDK API)构建的应用(其中 LOCAL_PRIVATE_PLATFORM_APIS := true)。

JNI绕过去的方法:
破坏调用堆栈绕过去,使 VM 无法识别调用方

可以通过 JniEnv::AttachCurrentThread(…) 函数创建一个新的 Thread 来完成。

jni层新建个线程,在这个线程里去反射,去除掉了java调用的信息,从而让安卓系统以为这个是系统调用。

frameworks/base/config/hiddenapi-greylist.txt

APK Sig Block 42

块的结构

8字节  块的大小 (整个块的大小,不包括这8个字节)
ID-Value序列
8字节  块的大小 (值=上面的块大小)
16字节  Magic标识, 内容为"APK Sig Block 42"

ID-Value序列的结构

8字节的Length
4字节的ID1
Value1
----------
Length
ID2
Value2
----------
...
----------
Length
IDn
Valuen

APK的签名信息就是其中一项,ID为0x7109871A

1. 首先根据ECDR的魔数0x06054b50(hex:50 4b 05 06)确定ECDR部分的起始地址,也就是 从文件末尾往前搜索 504B0506
2. ECDR的起始地址+16, 就存储着, 中央目录起始地址

—————————

1. 确定中央目录地址(中央目录地址刚好是APK签名地址的尾地址)
2. 根据中央目录地址和APK签名块的长度确定ID-Value序列的首地址和尾地址
————————-

APK 签名方案 v2分块是一个签名序列,说明可以使用多个签名者对同一个APK进行签名。每个签名信息中均包含了三个部分的内容:

1. 带长度前缀的signed data
其中包含了通过一系列算法计算的摘要列表、证书信息,以及extra信息(可选);

2. 带长度前缀的signatures序列
通过一系列算法对signed data的签名列表。签名时使用了多个签名算法,在签名校验时会是选择系统支持的安全系数最高的签名进行校验;

3. 证书公钥

android的so在proc maps中找不到了

在构建应用的发布版本时,请确保在应用的 build.gradle 文件中将 useLegacyPackaging 设置为 false,以便将未压缩的 .so 文件打包到 APK 中。停用此标志可防止 PackageManager 在安装过程中将 .so 文件从 APK 复制到文件系统,并具有减小应用更新的额外好处。


从 AGP 4.2.0 开始,DSL 选项 useLegacyPackaging 取代了 extractNativeLibs 清单属性。请使用应用的 build.gradle 文件中的 useLegacyPackaging(而非清单文件/manifest中的 extractNativeLibs)来配置原生库压缩行为。


extractNativeLibs属性指示软件包安装程序是否将原生库从 APK 提取到文件系统。如果设置为 “false”,则原生库以未压缩的形式存储在 APK 中。虽然您的 APK 可能较大,但应用加载速度更快,因为库是在应用运行时直接从 APK 加载。

extractNativeLibs 的默认值取决于 minSdkVersion 和您使用的 AGP 版本。在大多数情况下,默认行为很可能符合您的预期,您无需显式设置此属性。


安卓高版本在安装apk时可以不解压lib中的so文件,而将其直接映射到内存中实现加载


建议以未压缩的形式打包原生库,因为这会减小应用安装大小,缩减应用下载大小,并缩短用户的应用加载时间。不过,如果您希望 Android Gradle 插件在构建应用时打包压缩后的原生库,请在应用的 build.gradle 文件中将 useLegacyPackaging 设置为 true:

android {
    packagingOptions {
        jniLibs {
            useLegacyPackaging true
        }
    }
}

在逆向中很常用的一个技巧就是对apk安装后的so文件进行patch然后替换原来的so文件,这样可以绕过签名校验。这种方式的前提是extractNativeLibs必须为true,否则在apk安装目录下并不会找到任何的so文件可以让我们进行patch。

地址                  权限    偏移  dev  inode                          路径
7b7f633000-7b7f635000 r--s 000a5000 fe:06 490828                         /data/app/~~dma2nKNikFTmU3qVUUlOuw==/xxx-t6ddVoyboOdWOHRpLk9xhQ==/base.apk
7b7f635000-7b7f637000 r--s 000b0000 fe:06 490828                         /data/app/~~dma2nKNikFTmU3qVUUlOuw==/xxx-t6ddVoyboOdWOHRpLk9xhQ==/base.apk

其中,地址  表示在 进程的地址空间的 起始地址
    权限, 表示该区段  是否可读,可写, 可执行, 可共享           上面就表示 可读, 可共享
偏移: If the region was mapped from a file (using mmap), this is the offset in the file where the mapping begins. If the memory was not mapped from a file, it's just 0.

设备:  If the region was mapped from a file, this is the major and minor device number (in hex) where the file lives.

inode:  If the region was mapped from a file, this is the file number(文件号)

路径:   If the region was mapped from a file, this is the name of the file. This field is blank for anonymous mapped regions.


在搭载 Android 10(API 级别 29)和更高版本的设备上,您现在可以告知平台直接从应用的 APK 文件运行嵌入式 DEX 代码。如果攻击者曾设法篡改了设备上本地编译的代码,此选项有助于防止这类攻击。

1) 在应用清单文件的 application 元素中将 android::useEmbeddedDex 属性设置为 true。
2) 在模块级 build.gradle.kts 文件(如果您使用的是 Groovy,则为 build.gradle 文件)中将 useLegacyPackaging 设置为 false


New approach introduced by Google in Marshmallow (Android 6) is enabled by setting extractNativeLibs to “false”. It expects the libraries stored uncompressed in the APK (STORE method) and zipaligned. There’s no need to extract them during installation. On app startup, the libraries can be loaded (memmapped) directly from the APK.

Android 6.0之后,就可以, app在启动时,直接从apk载入 lib, (mem mapped)


从Android 6.0 & AGP 3.6.0开始,系统支持直接加载apk中未压缩的so,也就是说在App安装时,系统不再将apk中的so解压,而在加载so时,直接从apk中加载。

findLibrary最终的实现是在DexPathList.findLibrary中,此时libraryName为”mytest”,经过System.mapLibraryName转换后,得到fileName为”libmytest.so”

加载so是先查找App路径下,然后再查找系统路径。通过前缀,也能发现,支持从zip文件base.apk中直接加载so

base.apk!/lib/arm64-v8a/libmytest.so

NativeLoaderNamespace::Load最终调用android_dlopen_ext加载所需so,采用Flag RTLD_NOW执行立即加载,android_dlopen_ext为Android扩展的dlopen实现,至此可以发现,Android的System.loadLibrary底层调用android_dlopen_ext来加载so,而非OpenJDK采用的dlopen

__loader_android_dlopen_ext

如果是直接从apk中加载so。name将类似于/data/app/~~WdKfQO1G6r3htDT7Rgo1DQ==/com.huchao.mysystemloadlibrary-6wbqoASC9saPFntEre3_MQ==/base.apk!/lib/arm64-v8a/libmytest.so,其路径中包含kZipFileSeparator(!/)将去apk文件中查找,调用open_library_in_zipfile,返回打开的apk文件的fd。

x25519

python有两个实现

https://gist.github.com/nickovs/cc3c22d15f239a2640c185035c06f8a3
保存为 curve25519.py

"""A pure Python implementation of Curve25519

This module supports both a low-level interface through curve25519(base_point, secret)
and curve25519_base(secret) that take 32-byte blocks of data as inputs and a higher
level interface using the X25519PrivateKey and X25519PublicKey classes that are
compatible with the classes in cryptography.hazmat.primitives.asymmetric.x25519 with
the same names.
"""

# By Nicko van Someren, 2021. This code is released into the public domain.

#                                  #### WARNING ####

# Since this code makes use of Python's built-in large integer types, it is NOT EXPECTED
# to run in constant time. While some effort is made to minimise the time variations,
# the underlying math functions are likely to have running times that are highly
# value-dependent, leaving this code potentially vulnerable to timing attacks. If this
# code is to be used to provide cryptographic security in an environment where the start
# and end times of the execution can be guessed, inferred or measured then it is critical
# that steps are taken to hide the execution time, for instance by adding a delay so that
# encrypted packets are not sent until a fixed time after the _start_ of execution.


# Implements ladder multiplication as described in "Montgomery curves and the Montgomery
# ladder" by Daniel J. Bernstein and Tanja Lange. https://eprint.iacr.org/2017/293.pdf

# Curve25519 is a Montgomery curve defined by:
# y**2 = x**3 + A * x**2 + x  mod P
# where P = 2**255-19 and A = 486662

P = 2 ** 255 - 19
_A = 486662


def _point_add(point_n, point_m, point_diff):
    """Given the projection of two points and their difference, return their sum"""
    (xn, zn) = point_n
    (xm, zm) = point_m
    (x_diff, z_diff) = point_diff
    x = (z_diff << 2) * (xm * xn - zm * zn) ** 2
    z = (x_diff << 2) * (xm * zn - zm * xn) ** 2
    return x % P, z % P


def _point_double(point_n):
    """Double a point provided in projective coordinates"""
    (xn, zn) = point_n
    xn2 = xn ** 2
    zn2 = zn ** 2
    x = (xn2 - zn2) ** 2
    xzn = xn * zn
    z = 4 * xzn * (xn2 + _A * xzn + zn2)
    return x % P, z % P


def _const_time_swap(a, b, swap):
    """Swap two values in constant time"""
    index = int(swap) * 2
    temp = (a, b, b, a)
    return temp[index:index+2]


def _raw_curve25519(base, n):
    """Raise the point base to the power n"""
    zero = (1, 0)
    one = (base, 1)
    mP, m1P = zero, one

    for i in reversed(range(256)):
        bit = bool(n & (1 << i))
        mP, m1P = _const_time_swap(mP, m1P, bit)
        mP, m1P = _point_double(mP), _point_add(mP, m1P, one)
        mP, m1P = _const_time_swap(mP, m1P, bit)

    x, z = mP
    inv_z = pow(z, P - 2, P)
    return (x * inv_z) % P


def _unpack_number(s):
    """Unpack 32 bytes to a 256 bit value"""
    if len(s) != 32:
        raise ValueError('Curve25519 values must be 32 bytes')
    return int.from_bytes(s, "little")


def _pack_number(n):
    """Pack a value into 32 bytes"""
    return n.to_bytes(32, "little")


def _fix_secret(n):
    """Mask a value to be an acceptable exponent"""
    n &= ~7
    n &= ~(128 << 8 * 31)
    n |= 64 << 8 * 31
    return n


def curve25519(base_point_raw, secret_raw):
    """Raise the base point to a given power"""
    base_point = _unpack_number(base_point_raw)
    secret = _fix_secret(_unpack_number(secret_raw))
    return _pack_number(_raw_curve25519(base_point, secret))


def curve25519_base(secret_raw):
    """Raise the generator point to a given power"""
    secret = _fix_secret(_unpack_number(secret_raw))
    return _pack_number(_raw_curve25519(9, secret))


class X25519PublicKey:
    def __init__(self, x):
        self.x = x

    @classmethod
    def from_public_bytes(cls, data):
        return cls(_unpack_number(data))

    def public_bytes(self):
        return _pack_number(self.x)


class X25519PrivateKey:
    def __init__(self, a):
        self.a = a

    @classmethod
    def from_private_bytes(cls, data):
        return cls(_fix_secret(_unpack_number(data)))

    def private_bytes(self):
        return _pack_number(self.a)

    def public_key(self):
        return _pack_number(_raw_curve25519(9, self.a))

    def exchange(self, peer_public_key):
        if isinstance(peer_public_key, bytes):
            peer_public_key = X25519PublicKey.from_public_bytes(peer_public_key)
        return _pack_number(_raw_curve25519(peer_public_key.x, self.a))

调用

from binascii import hexlify, unhexlify
from curve25519 import X25519PrivateKey

private_key     = unhexlify('')
peer_public_key = unhexlify('')

x25519x = X25519PrivateKey.from_private_bytes(private_key)

shared_secret = x25519x.exchange(peer_public_key)

print(hexlify(shared_secret))

另外一个实现
https://github.com/1ocalhost/x25519

调用方法

from binascii import hexlify, unhexlify
import x25519

private_key    = unhexlify('')
public_key = x25519.scalar_base_mult(private_key)
print(hexlify(public_key))

peer_public_key = unhexlify('')
shared_secret = x25519.scalar_mult(private_key, peer_public_key)
print(hexlify(shared_secret))

生成key

aaa = x25519.djbec.genkey()
bbb = x25519.djbec.encodeint(aaa)
print('priv=', hexlify(bbb))

rust

use rand_core::{OsRng, RngCore};
use x25519_dalek::{EphemeralSecret, StaticSecret, PublicKey,  x25519};



use hex::FromHex;


fn main() {

    //let alice_secret = StaticSecret::new(OsRng);
    //let alice_secret = StaticSecret::random_from_rng(&mut OsRng);
    
    let mut x4 = [0u8; 32];
    let _ = hex::decode_to_slice("", &mut x4 as &mut [u8]).unwrap();
    let alice_secret = StaticSecret::from(x4);    
    
    let xxx = alice_secret.to_bytes();  
    println!("alice secret={}", hex::encode_upper(&xxx));
    
    let alice_public = PublicKey::from(&alice_secret);
    
    
    // let bob_secret = StaticSecret::new(OsRng);
    //     let bob_secret = StaticSecret::random_from_rng(&mut OsRng);
    //let tmp = hex::decode("").unwrap();
    //let x4: [u8; 32] = (&tmp[..]).try_into().unwrap();    
    let x4 = <[u8; 32]>::from_hex("").unwrap();
    
    let bob_secret = StaticSecret::from(x4);     
    let bob_public = PublicKey::from(&bob_secret);
    
    let yyy = bob_secret.to_bytes();
    println!("bob secret={}", hex::encode_upper(&yyy));
    
    let zzz = bob_public.to_bytes();
    println!("bob public={}", hex::encode_upper(&zzz));    
    
    //let alice_shared_secret = alice_secret.diffie_hellman(&bob_public);
    //let bob_shared_secret = bob_secret.diffie_hellman(&alice_public);
    let alice_shared = x25519(xxx, zzz);
    let bob_shared = x25519(yyy, alice_public.to_bytes());    
    
    
    assert_eq!(alice_shared, bob_shared); 
    
    println!("shared={}", hex::encode_upper(&alice_shared));   


}

段刚-关于软件保护的若干忠告

来自《加密与解密》第17章
1. 尽量自行开发保护机制,不要依赖任何非自行开发的代码。
在不影响效率的情况下,可以用虚拟机保护软件处理需要保护的核心代码

2.不要依赖壳的保护。加密壳都能被解开或者脱壳。现在许多加密壳转向
虚拟机加密方向就是利用了这一特点。如果时间允许且有相应的技术能力,
可以设计自己的加壳压缩方法。如果利用现成的加壳工具,最好不要选择
流行的工具。保护强度与流行程度成反比。越是流行的工具,就越有可能
由于广泛深入地研究而有了通用的脱壳/解密办法。

3.增加对软件自身完整性的检查,包括对磁盘文件和内存映像的检查,以防止
有人修改程序,进而达到破解的目的。

4. 不要采用一目了然的名字来命令函数和文件,例如
IsLicenseVersion key.data 等。
所有与软件加密相关的字符串都不能以明文的形式直接存放在可执行文件中。
这些字符串最好是动态生成的。

5. 给用户的提示信息越少越好。因为任何蛛丝马迹都可能导致解密者直接
到达核心代码出。 例如,发现破解企图后,不要立即向用户发挥提示信息,
可以在系统的某个地方做一个记号,经过一段随机时间后使软件停止工作。
或者让软件“装作”正常工作,但在所处理的数据中加入一些”垃圾”.

6. 将注册码和安装时间记录在多个地方。

7. 检查注册信息和时间的代码越分散越好。
不要调用同一个函数 或者 判断同一个全局标志。
如果这样做,只要修改一个地方,其他地方就都破解了。

8. 不要通过一些众所周知的API来获取系统时间。可以通过读取关键系统
文件的修改时间来获取系统时间的相关信息。

9. 如果有可能,应采用连网检查注册码的方法,而且数据在网上传输时要加密。

10. 编程时在软件嵌入反跟踪代码,以提高安全性。

11. 在检查注册信息时插入大量无用的运算以误导解密者,并在检查出错误的注册信息后
加入延时机制。

12. 为软件保护增加一定的随机性。例如,除了在启动时检查注册码,还可以在
软件运行的某个时刻随机检查注册码。随机值还可以很好地防范那些模拟工具的解密
,例如软件狗模拟程序。

13. 如果采用注册码的保护方式,最好是一机一码,也就是注册码与机器特征相关。
这样,一台机器上的注册码就无法在另外一台机器上使用。
机器特征要从一些不可修改 的值上获取。也要防止API被hook

14. 如果试用版与正式版是独立的版本,可试用版本软件不具有某项功能,
则不要只禁用相关菜单,而要彻底删除相关代码,是编译后的程序中根本没有
相关的功能代码。

15. 如果软件中包含驱动程序,则最好将保护嗲吗放在驱动程序中。
驱动程序在访问系统资源时受到的限制比普通应用程序少得多,这也给软件设计者
提供了发挥的余地。

16. 如果采用keyfile的保护方式,则keyfile的体积不能太小。可以将其结构设计
得复杂一些,在程序的不同位置,对keyfie的不同部分进行复杂的运算和检查。

17. 自己设计的检查注册信息的算法不能过于简单,最好采用成熟的密码学算法。

设计加密方案时 应该多从解密的角度考虑,这样才能比较合理地运用各种技术。
当然,任何加密方案都无法达到完美的程序。
因此,在设计时,要考虑其他方面的平衡。

ssh密钥类型

ssh -Q key

ssh-ed25519
ssh-ed25519-cert-v01@openssh.com
sk-ssh-ed25519@openssh.com
sk-ssh-ed25519-cert-v01@openssh.com
ecdsa-sha2-nistp256
ecdsa-sha2-nistp256-cert-v01@openssh.com
ecdsa-sha2-nistp384
ecdsa-sha2-nistp384-cert-v01@openssh.com
ecdsa-sha2-nistp521
ecdsa-sha2-nistp521-cert-v01@openssh.com
sk-ecdsa-sha2-nistp256@openssh.com
sk-ecdsa-sha2-nistp256-cert-v01@openssh.com
ssh-dss
ssh-dss-cert-v01@openssh.com
ssh-rsa
ssh-rsa-cert-v01@openssh.com

查询的是key的类型,不是 签名算法的类型
ssh-rsa 都是 rsa的key,但是有三种hash 签名, 分别是 默认的 SHA-1, SHA-256 (rsa-sha2-256) 和 SHA-512 (rsa-sha2-512)
它们key的类型都是 rsa, 但签名算法 不一样。

推荐使用 Ed25519 和 RSA2048, RSA3072, RSA4096

ECDSA 据说可能存在 美国国家安全局的后门
DSA 在 数学上已经证明不可靠

rust jni对Java异常的处理

在CPP中,可以调用 ExceptionOccurred 和 ExceptionCheck 来检查处理异常

jthrowable flag = env->ExceptionOccurred();
{
   /* Handle exception here or free up any resources held
      Exception remains pending until control returns back
      to the Java code.
   */
   return;
}

jboolean flag = env->ExceptionCheck();
if (flag) {
   /* Handle exception here or free up any resources held
      Exception remains pending until control returns back
      to the Java code.
   */
   env->ExceptionClear();



   return;
}

在 rust 的 jni 这个crate里, 上述两个函数,被转换成了 exception_occurred 和 exception_check

在rust调用 call_static_method 或者 call_method, 都不会执行异常处理,需要你自己 处理它们

let result = je.call_static_method(cls, "renderTo", "(II[B)V", &[
    JValue::from(width),
    JValue::from(height),
    JValue::from(rgbs2),
])?;
je.exception_check()?;

在调用 可能抛出Java异常的JNI函数后,总是调用 ExceptionCheck 或 ExceptionOccurred 来检查下异常。
总是检查从 JNI函数返回的值。

rust ipv6 udp reverse echo server and client

server
监听在 udp4 和 6 的端口, buf大小为2048, 超过的会丢弃。
将接受到的 内容 反转后 发回给客户端

use std::net::UdpSocket;


fn main() -> std::io::Result<()> {
    //let socket = UdpSocket::bind("0.0.0.0:2000")?; // for UDP4
    let socket = UdpSocket::bind("[::]:2000")?;  // for UDP4/6
    let mut buf = [0; 2048];

    loop {
        // Receives a single datagram message on the socket.
        // If `buf` is too small to hold
        // the message, it will be cut off.
        let (amt, src) = socket.recv_from(&mut buf)?;

        // Redeclare `buf` as slice of the received data
	// and send data back to origin.
        let buf = &mut buf[..amt];

        //reverse
        buf.reverse();

        socket.send_to(buf, &src)?;
    }
}

client

use std::io::{self, BufRead};
use std::net::UdpSocket;
use std::env;
use std::str;

fn main() -> std::io::Result<()> {

    let args: Vec = env::args().collect();
    if args.len() < 2 {
        println!("Usage {} hostname", args[0]);
        std::process::exit(1);
    }
    let hostname = &args[1];

    let socket = UdpSocket::bind("[::]:0")?;  // for UDP4/6
    
    socket.connect(hostname.to_string() + &":2000").expect("couldn't connect to address");

    // from https://stackoverflow.com/questions/30186037/how-can-i-read-a-single-line-from-stdin
    let stdin = io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        println!("Line read from stdin '{}'", line);
        if &line == "BYE" {
            break;
        }

        //socket.send_to(line.as_bytes(), hostname.to_string() + &":2000").expect("Error on send");
        socket.send(line.as_bytes()).expect("Error on send");

        let mut buf = [0; 2048];
        let (amt, _src) = socket.recv_from(&mut buf)?;

        let echo = str::from_utf8(&buf[..amt]).unwrap();
        println!("Echo {}", echo);
    }
    
    Ok(())
}

udp的connect 是可以直接接受 “hostname:port” 字符串形式的参数的, 它会自动进行DNS解析

可以互通的python client

import socket 
import sys 
import argparse 
 
data_payload = 2048 
 
def echo_client(server, port): 
    """ A simple echo client """ 
    # Create a UDP socket 
    sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 
 
    server_address = (server, port) 
    print ("Connecting to %s port %s" % server_address) 
    message = 'This is the message.  It will be repeated.' 
 
    try: 
 
        # Send data 
        message = "Test message. This will be echoed" 
        print ("Sending= %s" % message) 
        sent = sock.sendto(message.encode('utf-8'), server_address) 
 
        # Receive response 
        data, server = sock.recvfrom(data_payload) 
        print ("received= %s" % data) 
 
    finally: 
        print ("Closing connection to the server") 
        sock.close() 
 
if __name__ == '__main__': 
    parser = argparse.ArgumentParser(description='Socket Server Example') 
    parser.add_argument('--port', action="store", dest="port", type=int, required=True) 
    parser.add_argument('--server', action="store", dest="server", type=str, required=True)     
    given_args = parser.parse_args()  
    port = given_args.port
    server = given_args.server
    
    echo_client(server, port) 

服务端

import socket 
import sys 
import argparse 
 
host = '::' 
data_payload = 2048 

 
def echo_server(port): 
    """ A simple echo server """ 

    # Create a UDP socket 
    sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 
 
    # Bind the socket to the port 
    server_address = (host, port) 
    print ("Starting up echo server on %s port %s" % server_address) 
 
    sock.bind(server_address) 
 
    while True: 
        print ("Waiting to receive message from client") 
        data, address = sock.recvfrom(data_payload) 
     
        print ("received %s bytes from %s" % (len(data), address)) 
        print ("Data: %s" %data) 
     
        if data: 
            data = data[::-1]  # reverse
            sent = sock.sendto(data, address) 
            print ("sent %s bytes back to %s" % (sent, address)) 
 
 
if __name__ == '__main__': 

    parser = argparse.ArgumentParser(description='Socket Server Example')
    parser.add_argument('--port', action="store", dest="port", type=int, required=True) 
    given_args = parser.parse_args()  
    port = given_args.port 

    echo_server(port) 

nicegui生命周期事件

底层是FastAPI
这些事件来自于 fastapi

from datetime import datetime
from nicegui import app, ui

dt = datetime.now()

def handle_connection():
    global dt
    dt = datetime.now()

app.on_connect(handle_connection)

label = ui.label()
ui.timer(1, lambda: label.set_text(f'Last new connection: {dt:%H:%M:%S}'))

ui.run()

可以注册coroutine或者 function来处理这些事件:
app.on_startup NiceGUI app启动或者重启
app.on_shutdown 关闭或者重启
app.on_connect 当客户端连接上来时 (带一个可选参数 nicegui.Client)
app.on_disconnect 客户端 断开时 (nicegui.Client)
app.on_exception 发生异常时 (可选参数 exception)
当应用关闭时,所有正在执行的任务 都会自动取消