Help Center

How to send an email campaign using MailChannels Email API

If you're familiar with Twilio SendGrid's marketing API, you're likely familiar with their excellent Marketing Campaigns workflow: create a list, start a CSV import job, upload the CSV to the returned upload_uri with the required upload_headers, optionally poll the job_id until the import finishes, then create a Single Send targeted at send_to.list_ids and schedule it with send_at: "now". SendGrid’s CSV import uses field-definition IDs for field_mappings, and the field definitions endpoint returns both custom and reserved fields. (Twilio)

MailChannels Email API does not currently have dedicated marketing campaign endpoints yet, but that doesn't mean you can't send from lists. Instead, we recommend streaming messages directly from your CSV directly into Email API in 500-recipient batches using personalizations, posting those batches to the documented /send-async path (it's really fast), and relying on our documented Email API message structure, Mustache templating, DKIM signing, and unsubscribe supports to fill in the gaps. For non-transactional mail, MailChannels requires that each personalization must contain exactly one recipient and the message must be DKIM-signed; we also support mc-unsubscribe-url in Mustache templates and says send-async queues the work and returns immediately with a request ID. (MailChannels Documentation)

In this article, we illustrate with code the difference between sending to a marketing list using Twilio SendGrid vs. MailChannels Email API.

Twilio SendGrid: upload CSV into a stored list, then send a Single Send to that list

# pip install requests

import csv
import os
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import requests


SENDGRID_BASE_URL = os.getenv("SENDGRID_BASE_URL", "https://api.sendgrid.com/v3")
SENDGRID_API_KEY = os.environ["SENDGRID_API_KEY"]

CSV_PATH = Path(os.getenv("CSV_PATH", "contacts.csv"))
LIST_NAME = os.getenv("LIST_NAME", f"newsletter-{int(time.time())}")
CAMPAIGN_NAME = os.getenv("CAMPAIGN_NAME", f"{LIST_NAME}-campaign")

# These must exist in your SendGrid account already.
SENDGRID_SENDER_ID = int(os.environ["SENDGRID_SENDER_ID"])
SENDGRID_SUPPRESSION_GROUP_ID = int(os.environ["SENDGRID_SUPPRESSION_GROUP_ID"])

SUBJECT = os.getenv("SUBJECT", "April Product Update")
HTML_CONTENT = os.getenv(
    "HTML_CONTENT",
    """
    <html>
      <body>
        <h1>April Product Update</h1>
        <p>We shipped several improvements this month.</p>
        <p>Thanks for being with us.</p>
      </body>
    </html>
    """.strip(),
)
PLAIN_CONTENT = os.getenv(
    "PLAIN_CONTENT",
    "April Product Update\n\nWe shipped several improvements this month.\n\nThanks for being with us.",
)

POLL_INTERVAL_SECONDS = float(os.getenv("POLL_INTERVAL_SECONDS", "2.0"))

IDENTIFIER_FIELDS = {"email", "phone_number_id", "external_id", "anonymous_id"}
COMMON_HEADER_ALIASES = {
    "firstname": "first_name",
    "first_name": "first_name",
    "first name": "first_name",
    "lastname": "last_name",
    "last_name": "last_name",
    "last name": "last_name",
    "email address": "email",
    "email_address": "email",
    "e-mail": "email",
}


def normalize_header(value: str) -> str:
    normalized = "_".join(value.strip().lower().replace("-", " ").split())
    return COMMON_HEADER_ALIASES.get(normalized, normalized)


def build_session() -> requests.Session:
    session = requests.Session()
    session.headers.update(
        {
            "Authorization": f"Bearer {SENDGRID_API_KEY}",
            "Content-Type": "application/json",
        }
    )
    return session


SESSION = build_session()


def sg_request(method: str, path: str, **kwargs) -> requests.Response:
    url = f"{SENDGRID_BASE_URL}{path}"
    timeout = kwargs.pop("timeout", 60)
    response = SESSION.request(method=method, url=url, timeout=timeout, **kwargs)
    try:
        response.raise_for_status()
    except requests.HTTPError as exc:
        raise RuntimeError(
            f"SendGrid {method} {path} failed "
            f"with {response.status_code}: {response.text}"
        ) from exc
    return response


def read_csv_headers(csv_path: Path) -> List[str]:
    with csv_path.open("r", encoding="utf-8-sig", newline="") as handle:
        reader = csv.reader(handle)
        try:
            return next(reader)
        except StopIteration as exc:
            raise RuntimeError(f"{csv_path} is empty.") from exc


def get_field_definition_map() -> Dict[str, str]:
    """
    Returns a map like:
      {
        "email": "_rf2_T",
        "first_name": "_rf0_T",
        "my_custom_field": "w1",
      }
    """
    data = sg_request("GET", "/marketing/field_definitions").json()
    name_to_id: Dict[str, str] = {}

    for field in data.get("reserved_fields", []):
        name_to_id[normalize_header(field["name"])] = field["id"]

    for field in data.get("custom_fields", []):
        name_to_id[normalize_header(field["name"])] = field["id"]

    return name_to_id


def build_field_mappings(csv_headers: List[str], field_defs: Dict[str, str]) -> Tuple[List[Optional[str]], List[str]]:
    normalized_headers = [normalize_header(h) for h in csv_headers]

    if not any(h in IDENTIFIER_FIELDS for h in normalized_headers):
        raise RuntimeError(
            "CSV must contain at least one SendGrid contact identifier column, "
            "such as email, phone_number_id, external_id, or anonymous_id."
        )

    mappings: List[Optional[str]] = []
    skipped: List[str] = []

    for original_header, normalized_header in zip(csv_headers, normalized_headers):
        field_id = field_defs.get(normalized_header)
        if field_id is None:
            mappings.append(None)  # SendGrid accepts null to skip a column
            skipped.append(original_header)
        else:
            mappings.append(field_id)

    return mappings, skipped


def create_list(list_name: str) -> str:
    payload = {"name": list_name}
    response = sg_request("POST", "/marketing/lists", json=payload).json()
    return response["id"]


def start_import_job(list_id: str, field_mappings: List[Optional[str]]) -> Tuple[str, str, Dict[str, str]]:
    payload = {
        "list_ids": [list_id],
        "file_type": "csv",
        "field_mappings": field_mappings,
    }
    response = sg_request("PUT", "/marketing/contacts/imports", json=payload).json()

    upload_headers = {
        header_obj["header"]: header_obj["value"]
        for header_obj in response.get("upload_headers", [])
    }

    return response["job_id"], response["upload_uri"], upload_headers


def upload_csv_to_signed_url(csv_path: Path, upload_uri: str, upload_headers: Dict[str, str]) -> None:
    headers = dict(upload_headers)
    headers.setdefault("Content-Length", str(csv_path.stat().st_size))

    with csv_path.open("rb") as handle:
        response = requests.put(upload_uri, headers=headers, data=handle, timeout=300)
        try:
            response.raise_for_status()
        except requests.HTTPError as exc:
            raise RuntimeError(
                f"CSV upload to SendGrid import URL failed "
                f"with {response.status_code}: {response.text}"
            ) from exc


def wait_for_import(job_id: str) -> dict:
    while True:
        response = sg_request("GET", f"/marketing/contacts/imports/{job_id}").json()
        status = response["status"]

        if status in {"completed", "errored"}:
            return response
        if status == "failed":
            raise RuntimeError(
                f"SendGrid import job failed: {response}. "
                f"Check errors_url if present."
            )

        time.sleep(POLL_INTERVAL_SECONDS)


def create_single_send(list_id: str) -> str:
    payload = {
        "name": CAMPAIGN_NAME,
        "send_to": {"list_ids": [list_id]},
        "email_config": {
            "subject": SUBJECT,
            "html_content": HTML_CONTENT,
            "plain_content": PLAIN_CONTENT,
            "generate_plain_content": False,
            "editor": "code",
            "suppression_group_id": SENDGRID_SUPPRESSION_GROUP_ID,
            "sender_id": SENDGRID_SENDER_ID,
        },
    }

    response = sg_request("POST", "/marketing/singlesends", json=payload).json()
    return response["id"]


def schedule_single_send_now(single_send_id: str) -> None:
    sg_request(
        "PUT",
        f"/marketing/singlesends/{single_send_id}/schedule",
        json={"send_at": "now"},
    )


def main() -> None:
    if not CSV_PATH.exists():
        raise FileNotFoundError(f"CSV file not found: {CSV_PATH}")

    csv_headers = read_csv_headers(CSV_PATH)
    field_defs = get_field_definition_map()
    field_mappings, skipped_headers = build_field_mappings(csv_headers, field_defs)

    if skipped_headers:
        print("Skipping CSV columns with no matching SendGrid field definition:")
        for header in skipped_headers:
            print(f"  - {header}")

    print(f"Creating list: {LIST_NAME}")
    list_id = create_list(LIST_NAME)
    print(f"Created list_id={list_id}")

    print("Creating import job...")
    job_id, upload_uri, upload_headers = start_import_job(list_id, field_mappings)
    print(f"Import job started: job_id={job_id}")

    print("Uploading CSV...")
    upload_csv_to_signed_url(CSV_PATH, upload_uri, upload_headers)

    print("Waiting for import to finish...")
    import_status = wait_for_import(job_id)
    print(f"Import finished with status={import_status['status']}")

    results = import_status.get("results", {})
    if results:
        print("Import results:")
        print(f"  requested_count={results.get('requested_count')}")
        print(f"  created_count={results.get('created_count')}")
        print(f"  updated_count={results.get('updated_count')}")
        print(f"  errored_count={results.get('errored_count')}")
        if results.get("errors_url"):
            print(f"  errors_url={results['errors_url']}")

    print("Creating Single Send...")
    single_send_id = create_single_send(list_id)
    print(f"Created single_send_id={single_send_id}")

    print("Scheduling Single Send for now...")
    schedule_single_send_now(single_send_id)
    print("Campaign scheduled.")


if __name__ == "__main__":
    main()
 

MailChannels: stream the CSV and enqueue 500-recipient batches to /send-async

This version uses our fast async sending path /send-async, so the POST URL is https://api.mailchannels.net/tx/v1/send-async. The payload uses the documented Email API structure (personalizationsfromsubjectcontent) with Mustache templates, transactional: false, DKIM fields, and one recipient per personalization. (MailChannels Documentation)
 

# pip install requests

import csv
import os
from pathlib import Path
from typing import Dict, Iterator, List

import requests


CSV_PATH = Path(os.getenv("CSV_PATH", "contacts.csv"))
MC_SEND_ASYNC_URL = os.getenv(
    "MC_SEND_ASYNC_URL",
    "https://api.mailchannels.net/tx/v1/send-async",
)
MC_API_KEY = os.environ["MC_API_KEY"]

FROM_EMAIL = os.environ["MC_FROM_EMAIL"]
FROM_NAME = os.getenv("MC_FROM_NAME", "Marketing Team")
SUBJECT = os.getenv("SUBJECT", "April Product Update")

# Required for non-transactional / marketing mail in MailChannels.
MC_DKIM_DOMAIN = os.environ["MC_DKIM_DOMAIN"]
MC_DKIM_SELECTOR = os.environ["MC_DKIM_SELECTOR"]
MC_DKIM_PRIVATE_KEY_B64 = os.environ["MC_DKIM_PRIVATE_KEY_B64"]

BATCH_SIZE = 500

TEXT_TEMPLATE = """\
Hi {{first_name}},

We shipped several improvements this month.

To unsubscribe, visit: {{mc-unsubscribe-url}}
"""

HTML_TEMPLATE = """\
<html>
  <body>
    <h1>Hi {{first_name}},</h1>
    <p>We shipped several improvements this month.</p>
    <p>
      <a href="{{mc-unsubscribe-url}}">Unsubscribe</a>
    </p>
  </body>
</html>
"""

COMMON_HEADER_ALIASES = {
    "firstname": "first_name",
    "first_name": "first_name",
    "first name": "first_name",
    "lastname": "last_name",
    "last_name": "last_name",
    "last name": "last_name",
    "email address": "email",
    "email_address": "email",
    "e-mail": "email",
}


def normalize_header(value: str) -> str:
    normalized = "_".join(value.strip().lower().replace("-", " ").split())
    return COMMON_HEADER_ALIASES.get(normalized, normalized)


def normalize_row(row: Dict[str, str]) -> Dict[str, str]:
    normalized: Dict[str, str] = {}
    for key, value in row.items():
        if key is None:
            continue
        normalized[normalize_header(key)] = (value or "").strip()
    return normalized


def iter_csv_rows(csv_path: Path) -> Iterator[Dict[str, str]]:
    with csv_path.open("r", encoding="utf-8-sig", newline="") as handle:
        reader = csv.DictReader(handle)
        if not reader.fieldnames:
            raise RuntimeError(f"{csv_path} is empty or missing a header row.")

        for raw_row in reader:
            row = normalize_row(raw_row)
            if not row.get("email"):
                continue  # skip blank or malformed rows
            yield row


def make_recipient_name(row: Dict[str, str]) -> str:
    parts = [row.get("first_name", ""), row.get("last_name", "")]
    return " ".join(part for part in parts if part).strip()


def row_to_personalization(row: Dict[str, str]) -> Dict[str, object]:
    recipient = {"email": row["email"]}
    recipient_name = make_recipient_name(row)
    if recipient_name:
        recipient["name"] = recipient_name

    return {
        # Exactly one recipient per personalization, as required for
        # non-transactional / marketing mail.
        "to": [recipient],
        "dynamic_template_data": {
            "first_name": row.get("first_name") or "there",
        },
    }


def batched(iterator: Iterator[Dict[str, object]], batch_size: int) -> Iterator[List[Dict[str, object]]]:
    batch: List[Dict[str, object]] = []
    for item in iterator:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch


def build_payload(personalizations: List[Dict[str, object]]) -> Dict[str, object]:
    return {
        "transactional": False,
        "from": {
            "email": FROM_EMAIL,
            "name": FROM_NAME,
        },
        "subject": SUBJECT,
        "content": [
            {
                "type": "text/plain",
                "value": TEXT_TEMPLATE,
                "template_type": "mustache",
            },
            {
                "type": "text/html",
                "value": HTML_TEMPLATE,
                "template_type": "mustache",
            },
        ],
        "dkim_domain": MC_DKIM_DOMAIN,
        "dkim_selector": MC_DKIM_SELECTOR,
        "dkim_private_key": MC_DKIM_PRIVATE_KEY_B64,
        "personalizations": personalizations,
    }


def queue_batch(session: requests.Session, personalizations: List[Dict[str, object]]) -> Dict[str, object]:
    payload = build_payload(personalizations)
    response = session.post(MC_SEND_ASYNC_URL, json=payload, timeout=60)

    try:
        response.raise_for_status()
    except requests.HTTPError as exc:
        raise RuntimeError(
            f"MailChannels send-async failed with "
            f"{response.status_code}: {response.text}"
        ) from exc

    try:
        return response.json()
    except ValueError:
        return {"raw_response": response.text}


def main() -> None:
    if not CSV_PATH.exists():
        raise FileNotFoundError(f"CSV file not found: {CSV_PATH}")

    session = requests.Session()
    session.headers.update(
        {
            "X-Api-Key": MC_API_KEY,
            "Content-Type": "application/json",
        }
    )

    personalizations_iter = (row_to_personalization(row) for row in iter_csv_rows(CSV_PATH))

    total_recipients = 0
    total_batches = 0

    for batch_number, personalization_batch in enumerate(
        batched(personalizations_iter, BATCH_SIZE),
        start=1,
    ):
        result = queue_batch(session, personalization_batch)
        total_batches += 1
        total_recipients += len(personalization_batch)

        request_id = result.get("request_id") if isinstance(result, dict) else None
        print(
            f"Queued batch {batch_number}: "
            f"{len(personalization_batch)} recipients"
            + (f", request_id={request_id}" if request_id else "")
        )

    print(f"Done. Queued {total_recipients} recipients across {total_batches} batch(es).")


if __name__ == "__main__":
    main()

The practical difference is straightforward: Twilio SendGrid persists the audience as a marketing list and then targets that stored list_id with a Single Send, while the MailChannels example treats the CSV as the source of truth and pushes 500 personalized recipients at a time directly into send-async. (Twilio)

Was this article helpful?
0 out of 0 found this helpful
Have more questions? Submit a request

Comments

Please sign in to leave a comment.