Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libp2p/security/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@

class HandshakeFailure(BaseLibp2pError):
pass


class SecurityError(BaseLibp2pError):
pass
36 changes: 36 additions & 0 deletions libp2p/security/tls/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
TLS security transport for libp2p.

This module provides a comprehensive TLS transport implementation
that follows the Go libp2p TLS specification.
"""

from libp2p.security.tls.transport import (
TLSTransport,
IdentityConfig,
create_tls_transport,
PROTOCOL_ID
)
from libp2p.security.tls.io import TLSReadWriter
from libp2p.security.tls.certificate import (
generate_certificate,
create_cert_template,
verify_certificate_chain,
pub_key_from_cert_chain,
SignedKey,
ALPN_PROTOCOL
)

__all__ = [
"TLSTransport",
"IdentityConfig",
"TLSReadWriter",
"create_tls_transport",
"generate_certificate",
"create_cert_template",
"verify_certificate_chain",
"pub_key_from_cert_chain",
"SignedKey",
"PROTOCOL_ID",
"ALPN_PROTOCOL"
]
120 changes: 120 additions & 0 deletions libp2p/security/tls/certificate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
TLS certificate utilities for libp2p.

This module provides certificate generation and verification functions
that embed libp2p peer identity information in X.509 extensions.
"""

from dataclasses import dataclass

from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import ObjectIdentifier

from libp2p.crypto.keys import PrivateKey, PublicKey

# ALPN protocol for libp2p TLS
ALPN_PROTOCOL = "libp2p"

# Custom OID for libp2p peer identity extension (same as Rust implementation)
LIBP2P_EXTENSION_OID = ObjectIdentifier("1.3.6.1.4.1.53594.1.1")


@dataclass
class SignedKey:
"""Represents a signed public key embedded in certificate extension."""

public_key_bytes: bytes
signature: bytes


def create_cert_template() -> x509.CertificateBuilder:
"""
Create a certificate template for libp2p TLS certificates.

Returns:
Certificate builder template

"""
raise NotImplementedError("TLS certificate template creation not implemented")


def add_libp2p_extension(
cert_builder: x509.CertificateBuilder, peer_public_key: PublicKey, signature: bytes
) -> x509.CertificateBuilder:
"""
Add libp2p peer identity extension to certificate.

Args:
cert_builder: Certificate builder to modify
peer_public_key: Peer's public key to embed
signature: Signature over the certificate's public key

Returns:
Certificate builder with libp2p extension

"""
raise NotImplementedError("libp2p extension addition not implemented")


def generate_certificate(
private_key: PrivateKey, cert_template: x509.CertificateBuilder
) -> tuple[str, str]:
"""
Generate a self-signed certificate with libp2p extensions.

Args:
private_key: Private key for signing
cert_template: Certificate template

Returns:
Tuple of (certificate PEM, private key PEM)

"""
raise NotImplementedError("Certificate generation not implemented")


def verify_certificate_chain(cert_chain: list[x509.Certificate]) -> PublicKey:
"""
Verify certificate chain and extract peer public key from libp2p extension.

Args:
cert_chain: List of certificates in the chain

Returns:
Public key from libp2p extension

Raises:
SecurityError: If verification fails

"""
raise NotImplementedError("Certificate chain verification not implemented")


def pub_key_from_cert_chain(cert_chain: list[x509.Certificate]) -> PublicKey:
"""
Extract public key from certificate chain.

This is an alias for verify_certificate_chain for compatibility.

Args:
cert_chain: Certificate chain

Returns:
Public key

"""
return verify_certificate_chain(cert_chain)


def generate_self_signed_cert() -> tuple[ec.EllipticCurvePrivateKey, x509.Certificate]:
"""
Generate a self-signed certificate for testing purposes.

This is a utility function based on the guide examples.

Returns:
Tuple of (private key, certificate)

"""
raise NotImplementedError("Self-signed certificate generation not implemented")
133 changes: 133 additions & 0 deletions libp2p/security/tls/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""
TLS I/O utilities for libp2p.

This module provides TLS-specific message reading and writing functionality,
similar to how noise handles encrypted communication.
"""

import ssl

from cryptography import x509

from libp2p.abc import IRawConnection
from libp2p.io.abc import EncryptedMsgReadWriter


class TLSReadWriter(EncryptedMsgReadWriter):
"""
TLS encrypted message reader/writer.

This class handles TLS encryption/decryption over a raw connection,
similar to NoiseTransportReadWriter in the noise implementation.
"""

def __init__(
self,
conn: IRawConnection,
ssl_context: ssl.SSLContext,
server_side: bool = False,
server_hostname: str | None = None,
):
"""
Initialize TLS reader/writer.

Args:
conn: Raw connection to wrap
ssl_context: SSL context for TLS operations
server_side: Whether to act as TLS server
server_hostname: Server hostname for client connections

"""
self.raw_connection = conn
self.ssl_context = ssl_context
self.server_side = server_side
self.server_hostname = server_hostname
self._ssl_socket = None
self._peer_certificate: x509.Certificate | None = None
self._handshake_complete = False

async def handshake(self) -> None:
"""
Perform TLS handshake.

Raises:
HandshakeFailure: If handshake fails

"""
raise NotImplementedError("TLS handshake not implemented")

def get_peer_certificate(self) -> x509.Certificate | None:
"""
Get the peer's certificate after handshake.

Returns:
Peer certificate or None if not available

"""
return self._peer_certificate

async def write_msg(self, msg: bytes) -> None:
"""
Write an encrypted message.

Args:
msg: Message to encrypt and send

"""
raise NotImplementedError("TLS write_msg not implemented")

async def read_msg(self) -> bytes:
"""
Read and decrypt a message.

Returns:
Decrypted message bytes

"""
raise NotImplementedError("TLS read_msg not implemented")

def encrypt(self, data: bytes) -> bytes:
"""
Encrypt data for transmission.

Args:
data: Data to encrypt

Returns:
Encrypted data

"""
# In TLS, encryption is handled at the SSL layer during write_msg
# This method exists for interface compatibility
return data

def decrypt(self, data: bytes) -> bytes:
"""
Decrypt received data.

Args:
data: Encrypted data to decrypt

Returns:
Decrypted data

"""
# In TLS, decryption is handled at the SSL layer during read_msg
# This method exists for interface compatibility
return data

async def close(self) -> None:
"""Close the TLS connection."""
raise NotImplementedError("TLS close not implemented")

def get_remote_address(self) -> tuple[str, int] | None:
"""
Get remote address from underlying connection.

Returns:
Remote address tuple or None

"""
if hasattr(self.raw_connection, "get_remote_address"):
return self.raw_connection.get_remote_address()
return None
Loading