asyncspammer — это мини-библиотека на Go для построения конвейеров (pipeline), где этапы обмениваются данными через каналы.
Проект демонстрирует принципы конкурентного программирования, работу с sync.WaitGroup, семафорами и потоковую обработку данных без промежуточного накопления.
- Универсальная функция
RunPipelineдля организации последовательной конвейерной обработки. - Безопасное закрытие каналов без дедлоков.
- Контролируемый уровень параллелизма (через семафоры).
- Поддержка батч-обработки.
- Стриминговая передача данных между стадиями.
- Детерминированный порядок вывода.
emails → SelectUsers → SelectMessages → CheckSpam → CombineResults → output
Каждая стадия принимает канал на вход (in <-chan interface{})
и возвращает канал на выход (out chan interface{}).
in-stringвнутриinterface{}. это имейлы юзеровout- отдает структуркиUser{}. это результат функцииGetUser()- особенности:
GetUser()выполняется 1 секунду. его можно вызывать параллельно для нескольких юзеров. это экономит время- у некоторых юзеров есть alias'ы(псевдонимы). то есть на вид это два разных имейла, но на самом деле это один и тот же юзер в базе. например "batman@mail.ru" - это алиас к "bruce.wayne@mail.ru".
SelectUsers()должен отдавать вoutтолько уникальных юзеров.
in-User{}внутриinterface{}отSelectUsers()out- отдаетMsgID. это айдишники писем юзеров - результат функцииGetMessages()- особенности:
GetMessages()выполняется 1 секунду. его тоже можно вызывать параллельно.- но
GetMessages()позволяет использовать "батчи". то есть за раз в нее можно запихнуть не 1 юзера, а несколько. максимальное кол-во юзеров = 2. то есть если мы хотим селектнуть у 10ти юзеров письма, то это можно сделать за 5 вызововGetMessages(). в тестах проверяется, что кол-во вызовов оптимальное
in-MsgIDвнутриinterface{}. это айдишники писемout-MsgData{}. это структура с парой полей: id и факт того является ли письмо спамом. это результат работыHasSpam()- особенности:
HasSpam()симулирует поход в сервис антиспама, чтоб проверить письмо на наличие спама. один запрос выполняется за 100мс. и у этого сервиса есть "антибрут" - его нельзя вызывать бесконтрольно в кучу потоков. если сделать к нему более 5 параллельных запросов, то он начнет возвращать ошибку и данные о наличии спама вы не получите.
in-MsgDataвнутриinterface{}out-string. это строки вида "<has_spam> <msg_id>", например "true 17696166526272393238"- особенности:
-
CombineResults()ждет все результаты изin, а потом сортирует их по наличию спама и по msg_id. то есть пример вывода может быть такой:true 123 true 5555 true 5556 false 140 false 3000 false 3005
-
Запускает цепочку стадий и последовательно связывает их каналы.
Каждая стадия работает в отдельной горутине и закрывает out после завершения.
RunPipeline(
SelectUsers,
SelectMessages,
CheckSpam,
CombineResults,
)go test -v -race