여태까지 RabbitMQ의 내용에 대해 개괄적으로 알아보았습니다. (하지만 ACK에 대해서는 아직 알아보진 않았습니다. 이는 다음 스터디로 넘기려고 합니다) 이러한 내용을 기반으로 채점 서버에서 어떤식으로 RabbitMQ를 사용하는지 코드 레벨에서 알아보려고 합니다. Golang 기반이지만, 역할을 뚜렷하게 설명해드릴테니 거부감을 버리셨으면 합니다. (Nest 서버로 RabbitMQ에 대해 논하는 것은 Message Queue 사용 이유가 와닿지 않을 것 같아서 Iris 서버로 설명하게 되었습니다)
connector.Factory(
connector.RABBIT_MQ,
connector.Providers{Router: routeProvider, Logger: logProvider},
rabbitmq.ConsumerConfig{
AmqpURI: uri,
ConnectionName: utils.Getenv("RABBITMQ_CONSUMER_CONNECTION_NAME", "iris-consumer"),
QueueName: utils.Getenv("RABBITMQ_CONSUMER_QUEUE_NAME", "client.q.judge.submission"),
Ctag: utils.Getenv("RABBITMQ_CONSUMER_TAG", "consumer-tag"),
},
rabbitmq.ProducerConfig{
AmqpURI: uri,
ConnectionName: utils.Getenv("RABBITMQ_PRODUCER_CONNECTION_NAME", "iris-producer"),
ExchangeName: utils.Getenv("RABBITMQ_PRODUCER_EXCHANGE_NAME", "iris.e.direct.judge"),
RoutingKey: utils.Getenv("RABBITMQ_PRODUCER_ROUTING_KEY", "judge.result"),
},
).Connect(context.Background())
go서버가 실행되는 main함수에서 직접적인 RabbitMQ와의 통신을 맺습니다.
Factory
함수로 Connection이 맺어진 Connector가 반환됩니다.
Connector가 Connect 함수를 호출하면서, Consumer의 Channel, Producer의 Channel, Subscribe, 그리고, Subscribe하면서 Queue에 적재되는 메시지를 처리하기 위한 goroutine이 실행됩니다. (goroutine은 message를 비동기적으로 처리하는 친구입니다)
그럼 1번의 Factory 함수부터 살펴봅시다.
func Factory(c Module, p Providers, args ...any) Connector {
switch c {
case RABBIT_MQ:
consumerConfig, ok := args[0].(rabbitmq.ConsumerConfig)
if !ok {
panic(fmt.Sprintf("Invalid consumer config: %v", consumerConfig))
}
consumer, err := rabbitmq.NewConsumer(consumerConfig, p.Logger)
if err != nil {
panic(err)
}
producerConfig, ok := args[1].(rabbitmq.ProducerConfig)
if !ok {
panic(fmt.Sprintf("Invalid producer config: %v", producerConfig))
}
producer, err := rabbitmq.NewProducer(producerConfig, p.Logger)
if err != nil {
panic(err)
}
return rabbitmq.NewConnector(consumer, producer, p.Router, p.Logger)
case HTTP:
panic("Need to be implemented")
case FILE:
panic("Need to be implemented")
case CONSOLE:
panic("Need to be implemented")
default:
panic("Unsupported Connector")
}
}
아까 Main함수에서, Consumer와 관련된 설정, Producer에 관한 설정을 Factory 함수의 파라미터로 전달하였습니다. Factory 함수를 해당 인자를 받고, Consumer와 Producer를 생성하고, 이러한 생성 객체를 기반으로 NewConnector
함수로 Connector 객체를 반환합니다.
여기서 주목해야할 부분이, Producer와 Consumer 객체를 생성할 때, Connection을 만든다는 점입니다.
func NewConsumer(config ConsumerConfig, logger logger.Logger) (*consumer, error) {
// Create New RabbitMQ Connection (go <-> RabbitMQ)
amqpConfig := amqp.Config{Properties: amqp.NewConnectionProperties()}
amqpConfig.Properties.SetClientConnectionName(config.ConnectionName)
connection, err := amqp.DialConfig(config.AmqpURI, amqpConfig)
if err != nil {
return nil, fmt.Errorf("consumer: dial failed: %w", err)
}
return &consumer{
connection: connection,
channel: nil,
queueName: config.QueueName,
tag: config.Ctag,
Done: make(chan error),
logger: logger,
}, nil
}
이렇게 Consumer 객체를 생성할 때, rabbitMQ와의 커넥션이 맺어진 consumer 객체를 반환합니다. Producer도 마찬가지이므로, 생략하겠습니다.
우리가 저번 주에 하나의 Connection을 기반으로 Channel을 만드는 것이 좋다고 공부했었습니다. 그렇다면 Main함수에서 Connector의 Connect함수를 호출할 때, Channel을 만드는 로직이 포함될 것임을 유추할 수 있습니다.
다시, Factory 함수로 돌아와서 반환되는 Connector정보는 다음과 같습니다.
func NewConnector(
consumer Consumer,
producer Producer,
router router.Router,
logger logger.Logger,
) *connector {
return **&connector{consumer, producer, router, make(chan error), logger}**
}
이렇게 MQ와 연결된 consumer, producer 객체 정보가 포함된 Connector 객체를 반환합니다.
그렇다면 이제 본격적인 Connector의 Connect
함수를 살펴봅시다.
func (c *connector) Connect(ctx context.Context) {
connectorCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer func() {
cancel()
c.consumer.CleanUp()
c.producer.CleanUp()
}()
err := c.consumer.OpenChannel()
if err != nil {
panic(err)
}
err = c.producer.OpenChannel()
if err != nil {
panic(err)
}
messageCh, err := c.consumer.Subscribe()
if err != nil {
panic(err)
}
for message := range messageCh {
go c.handle(message, connectorCtx)
}
}
반환되는 Connector에서 consumer, producer 객체를 꺼내서 Channel을 만들고, Consumer의 경우 Subscribe까지 진행합니다. 이 때, Subscribe의 반환 값은 messageCh이라는 Golang의 Channel
입니다.
이러한 Channel 또한 메시지큐의 성격과 비슷합니다. 즉, Channel(messageCh)에 Message Queue로부터 Message가 도착하면, 이를 즉각적으로 꺼내서 코드를 채점합니다. 아래의 코드가 이 코드입니다.
for message := range messageCh {
go c.handle(message, connectorCtx)
}
Channel에서 들어오는 요청들을 goroutine으로 비동기 처리합니다.
여기까지는, Subscribe 까지의 로직만 살펴보았습니다. Publish의 역할은 어디서 처리할까요? 당연히 Subscribe로 받은 요청을 처리하는 위의 코드, go c.handle(message, connectorCtx)
입니다.
func (c *connector) handle(message amqp.Delivery, ctx context.Context) {
var result []byte
if message.Type == "" {
result = router.NewResponse("", nil, fmt.Errorf("type(message property) must not be empty")).Marshal()
} else if message.MessageId == "" {
result = router.NewResponse("", nil, fmt.Errorf("message_id(message property) must not be empty")).Marshal()
} else {
result = c.router.Route(message.Type, message.MessageId, message.Body)
}
if err := c.producer.Publish(result, ctx); err != nil {
c.logger.Log(logger.ERROR, fmt.Sprintf("failed to publish result: %s: %s", string(result), err))
// nack
}
if err := message.Ack(false); err != nil {
c.logger.Log(logger.ERROR, fmt.Sprintf("failed to ack message: %s: %s", string(message.Body), err))
// retry
}
}
같은 Connector.go 파일에 속한 handle함수를 호출합니다. 이 후 로직은 다음과 같습니다.
인자로 들어온 message의 유효성을 체크합니다. (이전에 보았던 message Type(PUBLISH_TYPE), message Id(submission Id) 가 포함된 요청인지 확인합니다.
Route
함수를 통해 코드를 컴파일하여 채점하고, 결과를 반환합니다.
이러한 결과를 Publish
합니다.
여기까지 하면, RabbitMQ랑 Iris서버랑 어떻게 메시지를 주고받는지 코드는 전부 살펴본 것 같습니다. 이전에 정리한 Iris 코드 구조를 첨부하였으니, 참고해주시길 바랍니다.