Отслеживание IP-адресов попыток входа в ssh с помощью twisted.conch

Я пытаюсь захватить попытки ssh bruteforce с помощью простого ssh-сервера, код ниже и работает, однако я не могу сопоставить комбинации Username / Password с их исходным IP-адресом.

Это так же просто, как я могу получить его до сих пор, он еще не регистрируется, а просто печатает на stdout. Я использовал определение buildProtocol для распечатки IP-адреса нового соединения, когда оно сделано. Однако я хотел бы получить IP-адрес вместе с учетными данными имени пользователя и пароля, чтобы одновременно отслеживать несколько попыток ssh bruteforce у разных клиентов. В его основе я могу получить IP-адрес только при подключении, что делает отслеживание нескольких подключений одновременно невозможным.

Чтобы уточнить, когда я ссылаюсь на соединение или соединения, я не имею в виду успешный вход в систему, это невозможно с моим кодом и не предназначено. Я имею в виду одно соединение с ssh-сервером, которое позволяет 3 попытки пароля, прежде чем вы будете повторно подключаться.

from zope.interface import implements from twisted.conch.unix import UnixSSHRealm from twisted.cred import portal from twisted.cred.credentials import IUsernamePassword from twisted.cred.checkers import ICredentialsChecker from twisted.cred.error import UnauthorizedLogin from twisted.conch.ssh import factory, userauth, keys, session from twisted.internet import reactor, defer with open('id_rsa') as privateBlobFile: privateKey = privateBlobFile.read() with open('id_rsa.pub') as publicBlobFile: publicKey = publicBlobFile.read() class FailDB: credentialInterfaces = IUsernamePassword, implements(ICredentialsChecker) def requestAvatarId(self, credentials): print"%s:%s" % ( credentials.username, credentials.password) return defer.fail(UnauthorizedLogin("invalid password")) class UnixSSHdFactory(factory.SSHFactory): publicKeys = { 'ssh-rsa': keys.Key.fromString(data=publicKey) } privateKeys = { 'ssh-rsa': keys.Key.fromString(data=privateKey) } services = { 'ssh-userauth': userauth.SSHUserAuthServer } def buildProtocol(self, addr): print addr return factory.SSHFactory.buildProtocol(self, addr) if __name__ == '__main__': portal = portal.Portal(UnixSSHRealm()) portal.registerChecker(FailDB()) UnixSSHdFactory.portal = portal reactor.listenTCP(2022, UnixSSHdFactory()) reactor.run() 

Пример вывода:

 IPv4Address(TCP, '127.0.0.1', 42141) root:password123 root:123456 root:letmein 

Вы можете получить адрес из неудачной попытки входа в систему, SSHUserAuthServer из SSHUserAuthServer и переопределив метод _ebBadAuth . Пример:

 class AuthServer(userauth.SSHUserAuthServer): def _ebBadAuth(self, reason): addr = self.transport.getPeer().address.host print("addr {} failed to log in as {} using {}" .format(addr, self.user, self.method)) userauth.SSHUserAuthServer._ebBadAuth(self, reason) 

Если вы также захотите, чтобы соединения были успешными, вам не придется переопределять SSHFactory.services , но только обновлять их с помощью производного класса, например factory.services.update({'ssh-userauth': AuthServer}) .