Отслеживание IP-адресов попыток входа в ssh с помощью twisted.conch
Я пытаюсь захватить попытки ssh bruteforce с помощью простого ssh-сервера, код ниже и работает, однако я не могу сопоставить комбинации Username / Password с их исходным IP-адресом.
Это так же просто, как я могу получить его до сих пор, он еще не регистрируется, а просто печатает на stdout. Я использовал определение buildProtocol для распечатки IP-адреса нового соединения, когда оно сделано. Однако я хотел бы получить IP-адрес вместе с учетными данными имени пользователя и пароля, чтобы одновременно отслеживать несколько попыток ssh bruteforce у разных клиентов. В его основе я могу получить IP-адрес только при подключении, что делает отслеживание нескольких подключений одновременно невозможным.
Чтобы уточнить, когда я ссылаюсь на соединение или соединения, я не имею в виду успешный вход в систему, это невозможно с моим кодом и не предназначено. Я имею в виду одно соединение с ssh-сервером, которое позволяет 3 попытки пароля, прежде чем вы будете повторно подключаться.
from zope.interface import implements from twisted.conch.unix import UnixSSHRealm from twisted.cred import portal from twisted.cred.credentials import IUsernamePassword from twisted.cred.checkers import ICredentialsChecker from twisted.cred.error import UnauthorizedLogin from twisted.conch.ssh import factory, userauth, keys, session from twisted.internet import reactor, defer with open('id_rsa') as privateBlobFile: privateKey = privateBlobFile.read() with open('id_rsa.pub') as publicBlobFile: publicKey = publicBlobFile.read() class FailDB: credentialInterfaces = IUsernamePassword, implements(ICredentialsChecker) def requestAvatarId(self, credentials): print"%s:%s" % ( credentials.username, credentials.password) return defer.fail(UnauthorizedLogin("invalid password")) class UnixSSHdFactory(factory.SSHFactory): publicKeys = { 'ssh-rsa': keys.Key.fromString(data=publicKey) } privateKeys = { 'ssh-rsa': keys.Key.fromString(data=privateKey) } services = { 'ssh-userauth': userauth.SSHUserAuthServer } def buildProtocol(self, addr): print addr return factory.SSHFactory.buildProtocol(self, addr) if __name__ == '__main__': portal = portal.Portal(UnixSSHRealm()) portal.registerChecker(FailDB()) UnixSSHdFactory.portal = portal reactor.listenTCP(2022, UnixSSHdFactory()) reactor.run()
Пример вывода:
IPv4Address(TCP, '127.0.0.1', 42141) root:password123 root:123456 root:letmein
- Как удаленно выполнить процесс с помощью python
- tkinter.TclError: не удалось подключиться к отображению "localhost: 18.0"
- Терминал зависает после sshing через подпроцесс python
- Парамико закрывает ssh-соединение при исключении непарамико
- библиотеки python для обработки ssh
Вы можете получить адрес из неудачной попытки входа в систему, SSHUserAuthServer
из SSHUserAuthServer
и переопределив метод _ebBadAuth
. Пример:
class AuthServer(userauth.SSHUserAuthServer): def _ebBadAuth(self, reason): addr = self.transport.getPeer().address.host print("addr {} failed to log in as {} using {}" .format(addr, self.user, self.method)) userauth.SSHUserAuthServer._ebBadAuth(self, reason)
Если вы также захотите, чтобы соединения были успешными, вам не придется переопределять SSHFactory.services
, но только обновлять их с помощью производного класса, например factory.services.update({'ssh-userauth': AuthServer})
.
- Как я могу поймать / кроме ошибок регистрации в Django?
- как преобразовать карту скоростей в карту потока жидкости
- Есть ли способ использовать telnet в ssh-соединении в Robot Framework?
- Как добавить backspace в команду терминала Linux?
- Использование matplotlib на безголовом сервере Ubuntu 14.04
- Библиотека Python для планирования работы, ssh
- Объемный абзац Emacs для Python
- AttributeError: объект «NoneType» не имеет атрибута «open_session» »
- scp между двумя удаленными хостами – без пароля
- Модуль SSH для python
- Как подключить удаленный mongodb с pymongo