import * as Sentry from "@sentry/node";
import BullQueue from "bull";
import { addSeconds, differenceInSeconds } from "date-fns";
import { isArray, isEmpty, isNil } from "lodash";
import moment from "moment";
import path from "path";
import { Op, QueryTypes } from "sequelize";
import sequelize from "./database";
import GetDefaultWhatsApp from "./helpers/GetDefaultWhatsApp";
import GetWhatsappWbot from "./helpers/GetWhatsappWbot";
import formatBody from "./helpers/Mustache";
import { MessageData, SendMessage } from "./helpers/SendMessage";
import { getIO } from "./libs/socket";
import Campaign from "./models/Campaign";
import CampaignSetting from "./models/CampaignSetting";
import CampaignShipping from "./models/CampaignShipping";
import Company from "./models/Company";
import Contact from "./models/Contact";
import ContactList from "./models/ContactList";
import ContactListItem from "./models/ContactListItem";
import Plan from "./models/Plan";
import Schedule from "./models/Schedule";
import User from "./models/User";
import Whatsapp from "./models/Whatsapp";
import ShowFileService from "./services/FileServices/ShowService";
import { getMessageOptions } from "./services/WbotServices/SendWhatsAppMedia";
import { ClosedAllOpenTickets } from "./services/WbotServices/wbotClosedTickets";
import { logger } from "./utils/logger";
const nodemailer = require('nodemailer');
const CronJob = require('cron').CronJob;
const connection = process.env.REDIS_URI || "";
const limiterMax = process.env.REDIS_OPT_LIMITER_MAX || 1;
const limiterDuration = process.env.REDIS_OPT_LIMITER_DURATION || 3000;
interface ProcessCampaignData {
id: number;
delay: number;
}
interface PrepareContactData {
contactId: number;
campaignId: number;
delay: number;
variables: any[];
}
interface DispatchCampaignData {
campaignId: number;
campaignShippingId: number;
contactListItemId: number;
}
export const userMonitor = new BullQueue("UserMonitor", connection);
export const queueMonitor = new BullQueue("QueueMonitor", connection);
export const messageQueue = new BullQueue("MessageQueue", connection, {
limiter: {
max: limiterMax as number,
duration: limiterDuration as number
}
});
export const scheduleMonitor = new BullQueue("ScheduleMonitor", connection);
export const sendScheduledMessages = new BullQueue(
"SendSacheduledMessages",
connection
);
export const campaignQueue = new BullQueue("CampaignQueue", connection);
async function handleSendMessage(job) {
try {
const { data } = job;
const whatsapp = await Whatsapp.findByPk(data.whatsappId);
if (whatsapp == null) {
throw Error("Whatsapp não identificado");
}
const messageData: MessageData = data.data;
await SendMessage(whatsapp, messageData);
} catch (e: any) {
Sentry.captureException(e);
logger.error("MessageQueue -> SendMessage: error", e.message);
throw e;
}
}
{/*async function handleVerifyQueue(job) {
logger.info("Buscando atendimentos perdidos nas filas");
try {
const companies = await Company.findAll({
attributes: ['id', 'name'],
where: {
status: true,
dueDate: {
[Op.gt]: Sequelize.literal('CURRENT_DATE')
}
},
include: [
{
model: Whatsapp, attributes: ["id", "name", "status", "timeSendQueue", "sendIdQueue"], where: {
timeSendQueue: {
[Op.gt]: 0
}
}
},
]
}); */}
{/* companies.map(async c => {
c.whatsapps.map(async w => {
if (w.status === "CONNECTED") {
var companyId = c.id;
const moveQueue = w.timeSendQueue ? w.timeSendQueue : 0;
const moveQueueId = w.sendIdQueue;
const moveQueueTime = moveQueue;
const idQueue = moveQueueId;
const timeQueue = moveQueueTime;
if (moveQueue > 0) {
if (!isNaN(idQueue) && Number.isInteger(idQueue) && !isNaN(timeQueue) && Number.isInteger(timeQueue)) {
const tempoPassado = moment().subtract(timeQueue, "minutes").utc().format();
// const tempoAgora = moment().utc().format();
const { count, rows: tickets } = await Ticket.findAndCountAll({
where: {
status: "pending",
queueId: null,
companyId: companyId,
whatsappId: w.id,
updatedAt: {
[Op.lt]: tempoPassado
}
},
include: [
{
model: Contact,
as: "contact",
attributes: ["id", "name", "number", "email", "profilePicUrl"],
include: ["extraInfo"]
}
]
});
if (count > 0) {
tickets.map(async ticket => {
await ticket.update({
queueId: idQueue
});
await ticket.reload();
const io = getIO();
io.to(ticket.status)
.to("notification")
.to(ticket.id.toString())
.emit(`company-${companyId}-ticket`, {
action: "update",
ticket,
ticketId: ticket.id
});
// io.to("pending").emit(`company-${companyId}-ticket`, {
// action: "update",
// ticket,
// });
logger.info(`Atendimento Perdido: ${ticket.id} - Empresa: ${companyId}`);
});
} else {
logger.info(`Nenhum atendimento perdido encontrado - Empresa: ${companyId}`);
}
} else {
logger.info(`Condição não respeitada - Empresa: ${companyId}`);
}
}
}
});
});
} catch (e: any) {
Sentry.captureException(e);
logger.error("SearchForQueue -> VerifyQueue: error", e.message);
throw e;
}
}; */}
async function handleCloseTicketsAutomatic() {
const job = new CronJob('*/1 * * * *', async () => {
const companies = await Company.findAll();
companies.map(async c => {
try {
const companyId = c.id;
await ClosedAllOpenTickets(companyId);
} catch (e: any) {
Sentry.captureException(e);
logger.error("ClosedAllOpenTickets -> Verify: error", e.message);
throw e;
}
});
});
job.start()
}
async function handleVerifySchedules(job) {
try {
const { count, rows: schedules } = await Schedule.findAndCountAll({
where: {
status: "PENDENTE",
sentAt: null,
sendAt: {
[Op.gte]: moment().format("YYYY-MM-DD HH:mm:ss"),
[Op.lte]: moment().add("30", "seconds").format("YYYY-MM-DD HH:mm:ss")
}
},
include: [{ model: Contact, as: "contact" }]
});
if (count > 0) {
schedules.map(async schedule => {
await schedule.update({
status: "AGENDADA"
});
sendScheduledMessages.add(
"SendMessage",
{ schedule },
{ delay: 40000 }
);
logger.info(`Disparo agendado para: ${schedule.contact.name}`);
});
}
} catch (e: any) {
Sentry.captureException(e);
logger.error("SendScheduledMessage -> Verify: error", e.message);
throw e;
}
}
async function handleSendScheduledMessage(job) {
const {
data: { schedule }
} = job;
let scheduleRecord: Schedule | null = null;
try {
scheduleRecord = await Schedule.findByPk(schedule.id);
} catch (e) {
Sentry.captureException(e);
logger.info(`Erro ao tentar consultar agendamento: ${schedule.id}`);
}
try {
const whatsapp = await GetDefaultWhatsApp(schedule.companyId);
let filePath = null;
if (schedule.mediaPath) {
filePath = path.resolve("public", schedule.mediaPath);
}
await SendMessage(whatsapp, {
number: schedule.contact.number,
body: formatBody(schedule.body, schedule.contact),
mediaPath: filePath
});
await scheduleRecord?.update({
sentAt: moment().format("YYYY-MM-DD HH:mm"),
status: "ENVIADA"
});
logger.info(`Mensagem agendada enviada para: ${schedule.contact.name}`);
sendScheduledMessages.clean(15000, "completed");
} catch (e: any) {
Sentry.captureException(e);
await scheduleRecord?.update({
status: "ERRO"
});
logger.error("SendScheduledMessage -> SendMessage: error", e.message);
throw e;
}
}
async function handleVerifyCampaigns(job) {
/**
* @todo
* Implementar filtro de campanhas
*/
const campaigns: { id: number; scheduledAt: string }[] =
await sequelize.query(
`select id, "scheduledAt" from "Campaigns" c
where "scheduledAt" between now() and now() + '1 hour'::interval and status = 'PROGRAMADA'`,
{ type: QueryTypes.SELECT }
);
if (campaigns.length > 0)
logger.info(`Campanhas encontradas: ${campaigns.length}`);
for (let campaign of campaigns) {
try {
const now = moment();
const scheduledAt = moment(campaign.scheduledAt);
const delay = scheduledAt.diff(now, "milliseconds");
logger.info(
`Campanha enviada para a fila de processamento: Campanha=${campaign.id}, Delay Inicial=${delay}`
);
campaignQueue.add(
"ProcessCampaign",
{
id: campaign.id,
delay
},
{
removeOnComplete: true
}
);
} catch (err: any) {
Sentry.captureException(err);
}
}
}
async function getCampaign(id) {
return await Campaign.findByPk(id, {
include: [
{
model: ContactList,
as: "contactList",
attributes: ["id", "name"],
include: [
{
model: ContactListItem,
as: "contacts",
attributes: ["id", "name", "number", "email", "isWhatsappValid"],
where: { isWhatsappValid: true }
}
]
},
{
model: Whatsapp,
as: "whatsapp",
attributes: ["id", "name"]
},
{
model: CampaignShipping,
as: "shipping",
include: [{ model: ContactListItem, as: "contact" }]
}
]
});
}
async function getContact(id) {
return await ContactListItem.findByPk(id, {
attributes: ["id", "name", "number", "email"]
});
}
async function getSettings(campaign) {
const settings = await CampaignSetting.findAll({
where: { companyId: campaign.companyId },
attributes: ["key", "value"]
});
let messageInterval: number = 20;
let longerIntervalAfter: number = 20;
let greaterInterval: number = 60;
let variables: any[] = [];
settings.forEach(setting => {
if (setting.key === "messageInterval") {
messageInterval = JSON.parse(setting.value);
}
if (setting.key === "longerIntervalAfter") {
longerIntervalAfter = JSON.parse(setting.value);
}
if (setting.key === "greaterInterval") {
greaterInterval = JSON.parse(setting.value);
}
if (setting.key === "variables") {
variables = JSON.parse(setting.value);
}
});
return {
messageInterval,
longerIntervalAfter,
greaterInterval,
variables
};
}
export function parseToMilliseconds(seconds) {
return seconds * 1000;
}
async function sleep(seconds) {
logger.info(
`Sleep de ${seconds} segundos iniciado: ${moment().format("HH:mm:ss")}`
);
return new Promise(resolve => {
setTimeout(() => {
logger.info(
`Sleep de ${seconds} segundos finalizado: ${moment().format(
"HH:mm:ss"
)}`
);
resolve(true);
}, parseToMilliseconds(seconds));
});
}
function getCampaignValidMessages(campaign) {
const messages = [];
if (!isEmpty(campaign.message1) && !isNil(campaign.message1)) {
messages.push(campaign.message1);
}
if (!isEmpty(campaign.message2) && !isNil(campaign.message2)) {
messages.push(campaign.message2);
}
if (!isEmpty(campaign.message3) && !isNil(campaign.message3)) {
messages.push(campaign.message3);
}
if (!isEmpty(campaign.message4) && !isNil(campaign.message4)) {
messages.push(campaign.message4);
}
if (!isEmpty(campaign.message5) && !isNil(campaign.message5)) {
messages.push(campaign.message5);
}
return messages;
}
function getCampaignValidConfirmationMessages(campaign) {
const messages = [];
if (
!isEmpty(campaign.confirmationMessage1) &&
!isNil(campaign.confirmationMessage1)
) {
messages.push(campaign.confirmationMessage1);
}
if (
!isEmpty(campaign.confirmationMessage2) &&
!isNil(campaign.confirmationMessage2)
) {
messages.push(campaign.confirmationMessage2);
}
if (
!isEmpty(campaign.confirmationMessage3) &&
!isNil(campaign.confirmationMessage3)
) {
messages.push(campaign.confirmationMessage3);
}
if (
!isEmpty(campaign.confirmationMessage4) &&
!isNil(campaign.confirmationMessage4)
) {
messages.push(campaign.confirmationMessage4);
}
if (
!isEmpty(campaign.confirmationMessage5) &&
!isNil(campaign.confirmationMessage5)
) {
messages.push(campaign.confirmationMessage5);
}
return messages;
}
function getProcessedMessage(msg: string, variables: any[], contact: any) {
let finalMessage = msg;
if (finalMessage.includes("{nome}")) {
finalMessage = finalMessage.replace(/{nome}/g, contact.name);
}
if (finalMessage.includes("{email}")) {
finalMessage = finalMessage.replace(/{email}/g, contact.email);
}
if (finalMessage.includes("{numero}")) {
finalMessage = finalMessage.replace(/{numero}/g, contact.number);
}
variables.forEach(variable => {
if (finalMessage.includes(`{${variable.key}}`)) {
const regex = new RegExp(`{${variable.key}}`, "g");
finalMessage = finalMessage.replace(regex, variable.value);
}
});
return finalMessage;
}
export function randomValue(min, max) {
return Math.floor(Math.random() * max) + min;
}
async function verifyAndFinalizeCampaign(campaign) {
const { contacts } = campaign.contactList;
const count1 = contacts.length;
const count2 = await CampaignShipping.count({
where: {
campaignId: campaign.id,
deliveredAt: {
[Op.not]: null
}
}
});
if (count1 === count2) {
await campaign.update({ status: "FINALIZADA", completedAt: moment() });
}
const io = getIO();
io.to(`company-${campaign.companyId}-mainchannel`).emit(`company-${campaign.companyId}-campaign`, {
action: "update",
record: campaign
});
}
function calculateDelay(index, baseDelay, longerIntervalAfter, greaterInterval, messageInterval) {
const diffSeconds = differenceInSeconds(baseDelay, new Date());
if (index > longerIntervalAfter) {
return diffSeconds * 1000 + greaterInterval
} else {
return diffSeconds * 1000 + messageInterval
}
}
async function handleProcessCampaign(job) {
try {
const { id }: ProcessCampaignData = job.data;
const campaign = await getCampaign(id);
const settings = await getSettings(campaign);
if (campaign) {
const { contacts } = campaign.contactList;
if (isArray(contacts)) {
const contactData = contacts.map(contact => ({
contactId: contact.id,
campaignId: campaign.id,
variables: settings.variables,
}));
// const baseDelay = job.data.delay || 0;
const longerIntervalAfter = parseToMilliseconds(settings.longerIntervalAfter);
const greaterInterval = parseToMilliseconds(settings.greaterInterval);
const messageInterval = settings.messageInterval;
let baseDelay = campaign.scheduledAt;
const queuePromises = [];
for (let i = 0; i < contactData.length; i++) {
baseDelay = addSeconds(baseDelay, i > longerIntervalAfter ? greaterInterval : messageInterval);
const { contactId, campaignId, variables } = contactData[i];
const delay = calculateDelay(i, baseDelay, longerIntervalAfter, greaterInterval, messageInterval);
const queuePromise = campaignQueue.add(
"PrepareContact",
{ contactId, campaignId, variables, delay },
{ removeOnComplete: true }
);
queuePromises.push(queuePromise);
logger.info(`Registro enviado pra fila de disparo: Campanha=${campaign.id};Contato=${contacts[i].name};delay=${delay}`);
}
await Promise.all(queuePromises);
await campaign.update({ status: "EM_ANDAMENTO" });
}
}
} catch (err: any) {
Sentry.captureException(err);
}
}
let ultima_msg = 0;
async function handlePrepareContact(job) {
try {
const { contactId, campaignId, delay, variables }: PrepareContactData =
job.data;
const campaign = await getCampaign(campaignId);
const contact = await getContact(contactId);
const campaignShipping: any = {};
campaignShipping.number = contact.number;
campaignShipping.contactId = contactId;
campaignShipping.campaignId = campaignId;
const messages = getCampaignValidMessages(campaign);
if (messages.length) {
const radomIndex = ultima_msg;
console.log('ultima_msg:', ultima_msg);
ultima_msg++;
if (ultima_msg >= messages.length) {
ultima_msg = 0;
}
const message = getProcessedMessage(
messages[radomIndex],
variables,
contact
);
campaignShipping.message = `\u200c ${message}`;
}
if (campaign.confirmation) {
const confirmationMessages =
getCampaignValidConfirmationMessages(campaign);
if (confirmationMessages.length) {
const radomIndex = randomValue(0, confirmationMessages.length);
const message = getProcessedMessage(
confirmationMessages[radomIndex],
variables,
contact
);
campaignShipping.confirmationMessage = `\u200c ${message}`;
}
}
const [record, created] = await CampaignShipping.findOrCreate({
where: {
campaignId: campaignShipping.campaignId,
contactId: campaignShipping.contactId
},
defaults: campaignShipping
});
if (
!created &&
record.deliveredAt === null &&
record.confirmationRequestedAt === null
) {
record.set(campaignShipping);
await record.save();
}
if (
record.deliveredAt === null &&
record.confirmationRequestedAt === null
) {
const nextJob = await campaignQueue.add(
"DispatchCampaign",
{
campaignId: campaign.id,
campaignShippingId: record.id,
contactListItemId: contactId
},
{
delay
}
);
await record.update({ jobId: nextJob.id });
}
await verifyAndFinalizeCampaign(campaign);
} catch (err: any) {
Sentry.captureException(err);
logger.error(`campaignQueue -> PrepareContact -> error: ${err.message}`);
}
}
async function handleDispatchCampaign(job) {
try {
const { data } = job;
const { campaignShippingId, campaignId }: DispatchCampaignData = data;
const campaign = await getCampaign(campaignId);
const wbot = await GetWhatsappWbot(campaign.whatsapp);
if (!wbot) {
logger.error(`campaignQueue -> DispatchCampaign -> error: wbot not found`);
return;
}
if (!campaign.whatsapp) {
logger.error(`campaignQueue -> DispatchCampaign -> error: whatsapp not found`);
return;
}
if (!wbot?.user?.id) {
logger.error(`campaignQueue -> DispatchCampaign -> error: wbot user not found`);
return;
}
logger.info(
`Disparo de campanha solicitado: Campanha=${campaignId};Registro=${campaignShippingId}`
);
const campaignShipping = await CampaignShipping.findByPk(
campaignShippingId,
{
include: [{ model: ContactListItem, as: "contact" }]
}
);
const chatId = `${campaignShipping.number}@s.whatsapp.net`;
let body = campaignShipping.message;
if (campaign.confirmation && campaignShipping.confirmation === null) {
body = campaignShipping.confirmationMessage
}
if (!isNil(campaign.fileListId)) {
try {
const publicFolder = path.resolve(__dirname, "..", "public");
const files = await ShowFileService(campaign.fileListId, campaign.companyId)
const folder = path.resolve(publicFolder, "fileList", String(files.id))
for (const [index, file] of files.options.entries()) {
const options = await getMessageOptions(file.path, path.resolve(folder, file.path), file.name);
await wbot.sendMessage(chatId, { ...options });
};
} catch (error) {
logger.info(error);
}
}
if (campaign.mediaPath) {
const publicFolder = path.resolve(__dirname, "..", "public");
const filePath = path.join(publicFolder, campaign.mediaPath);
const options = await getMessageOptions(campaign.mediaName, filePath, body);
if (Object.keys(options).length) {
await wbot.sendMessage(chatId, { ...options });
}
}
else {
if (campaign.confirmation && campaignShipping.confirmation === null) {
await wbot.sendMessage(chatId, {
text: body
});
await campaignShipping.update({ confirmationRequestedAt: moment() });
} else {
await wbot.sendMessage(chatId, {
text: body
});
}
}
await campaignShipping.update({ deliveredAt: moment() });
await verifyAndFinalizeCampaign(campaign);
const io = getIO();
io.to(`company-${campaign.companyId}-mainchannel`).emit(`company-${campaign.companyId}-campaign`, {
action: "update",
record: campaign
});
logger.info(
`Campanha enviada para: Campanha=${campaignId};Contato=${campaignShipping.contact.name}`
);
} catch (err: any) {
Sentry.captureException(err);
logger.error(err.message);
console.log(err.stack);
}
}
async function handleLoginStatus(job) {
const users: { id: number }[] = await sequelize.query(
`select id from "Users" where "updatedAt" < now() - '5 minutes'::interval and online = true`,
{ type: QueryTypes.SELECT }
);
for (let item of users) {
try {
const user = await User.findByPk(item.id);
await user.update({ online: false });
logger.info(`Usuário passado para offline: ${item.id}`);
} catch (e: any) {
Sentry.captureException(e);
}
}
}
async function handleInvoiceCreate() {
logger.info("Iniciando geração de boletos");
const job = new CronJob('*/5 * * * * *', async () => {
const companies = await Company.findAll();
companies.map(async c => {
var dueDate = c.dueDate;
const date = moment(dueDate).format();
const timestamp = moment().format();
const hoje = moment(moment()).format("DD/MM/yyyy");
var vencimento = moment(dueDate).format("DD/MM/yyyy");
var diff = moment(vencimento, "DD/MM/yyyy").diff(moment(hoje, "DD/MM/yyyy"));
var dias = moment.duration(diff).asDays();
if (dias < 20) {
const plan = await Plan.findByPk(c.planId);
const sql = `SELECT COUNT(*) mycount FROM "Invoices" WHERE "companyId" = ${c.id} AND "dueDate"::text LIKE '${moment(dueDate).format("yyyy-MM-DD")}%';`
const invoice = await sequelize.query(sql,
{ type: QueryTypes.SELECT }
);
if (invoice[0]['mycount'] > 0) {
} else {
const sql = `INSERT INTO "Invoices" (detail, status, value, "updatedAt", "createdAt", "dueDate", "companyId")
VALUES ('${plan.name}', 'open', '${plan.value}', '${timestamp}', '${timestamp}', '${date}', ${c.id});`
const invoiceInsert = await sequelize.query(sql,
{ type: QueryTypes.INSERT }
);
/* let transporter = nodemailer.createTransport({
service: 'gmail',
auth: {
user: 'email@gmail.com',
pass: 'senha'
}
});
const mailOptions = {
from: 'heenriquega@gmail.com', // sender address
to: `${c.email}`, // receiver (use array of string for a list)
subject: 'Fatura gerada - Sistema', // Subject line
html: `Olá ${c.name} esté é um email sobre sua fatura!
Vencimento: ${vencimento}
Valor: ${plan.value}
Link: ${process.env.FRONTEND_URL}/financeiro
Qualquer duvida estamos a disposição!
`// plain text body
};
transporter.sendMail(mailOptions, (err, info) => {
if (err)
console.log(err)
else
console.log(info);
}); */
}
}
});
});
job.start()
}
handleCloseTicketsAutomatic()
handleInvoiceCreate()
export async function startQueueProcess() {
logger.info("Iniciando processamento de filas");
messageQueue.process("SendMessage", handleSendMessage);
scheduleMonitor.process("Verify", handleVerifySchedules);
sendScheduledMessages.process("SendMessage", handleSendScheduledMessage);
campaignQueue.process("VerifyCampaigns", handleVerifyCampaigns);
campaignQueue.process("ProcessCampaign", handleProcessCampaign);
campaignQueue.process("PrepareContact", handlePrepareContact);
campaignQueue.process("DispatchCampaign", handleDispatchCampaign);
userMonitor.process("VerifyLoginStatus", handleLoginStatus);
//queueMonitor.process("VerifyQueueStatus", handleVerifyQueue);
scheduleMonitor.add(
"Verify",
{},
{
repeat: { cron: "*/5 * * * * *", key: "verify" },
removeOnComplete: true
}
);
campaignQueue.add(
"VerifyCampaigns",
{},
{
repeat: { cron: "*/20 * * * * *", key: "verify-campaing" },
removeOnComplete: true
}
);
userMonitor.add(
"VerifyLoginStatus",
{},
{
repeat: { cron: "* * * * *", key: "verify-login" },
removeOnComplete: true
}
);
queueMonitor.add(
"VerifyQueueStatus",
{},
{
repeat: { cron: "*/20 * * * * *" },
removeOnComplete: true
}
);
}